PLC Bundle V1 Example Implementations
1#!/usr/bin/env python3
2
3"""
4plcbundle.py - A compact, readable reference implementation for creating
5plcbundle V1 compliant archives. This script demonstrates all critical spec
6requirements, including hashing, serialization, ordering, and boundary handling.
7
8PLC Bundle v1 Specification:
9 https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md
10"""
11
12import asyncio
13import hashlib
14import json
15import sys
16from datetime import datetime, timezone
17from pathlib import Path
18from typing import TypedDict, Self
19
20import httpx
21import zstd
22
23# --- Configuration ---
24BUNDLE_SIZE = 10000
25INDEX_FILE = 'plc_bundles.json'
26DEFAULT_DIR = './plc_bundles_py'
27PLC_URL = 'https://plc.directory'
28
29# --- Types (as per spec) ---
30class PLCOperation(TypedDict):
31 did: str
32 cid: str
33 createdAt: str
34 operation: dict
35 nullified: bool | str | None
36 _raw: str # Holds the original raw JSON string for reproducibility
37
38class BundleMetadata(TypedDict):
39 bundle_number: int
40 start_time: str
41 end_time: str
42 operation_count: int
43 did_count: int
44 hash: str # The chain hash
45 content_hash: str
46 parent: str
47 compressed_hash: str
48 compressed_size: int
49 uncompressed_size: int
50 cursor: str
51 created_at: str
52
53class Index(TypedDict):
54 version: str
55 last_bundle: int
56 updated_at: str
57 total_size_bytes: int
58 bundles: list[BundleMetadata]
59
60class PlcBundleManager:
61 """
62 Manages the state and process of fetching, validating, and creating PLC bundles.
63 """
64 _index: Index
65 _mempool: list[PLCOperation] = []
66 # This set correctly de-duplicates operations, both from the previous bundle's
67 # boundary and within new batches, and is pruned to stay memory-efficient.
68 _seen_cids = set[str]()
69
70 def __init__(self, bundle_dir: str):
71 self._bundle_dir = Path(bundle_dir)
72 self._http_client = httpx.AsyncClient(timeout=30)
73
74 @classmethod
75 async def create(cls, bundle_dir: str) -> Self:
76 """Factory to create and asynchronously initialize a PlcBundleManager instance."""
77 manager = cls(bundle_dir)
78 await manager._init()
79 return manager
80
81 async def _init(self):
82 """
83 Initializes the manager by loading the index and seeding the `seen_cids`
84 set with the CIDs from the last saved bundle's boundary.
85 """
86 self._bundle_dir.mkdir(exist_ok=True)
87 self._index = await self._load_index()
88 print(f"plcbundle Reference Implementation\nDirectory: {self._bundle_dir}\n")
89
90 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None
91 if last_bundle:
92 print(f"Resuming from bundle {last_bundle['bundle_number'] + 1}. Last op time: {last_bundle['end_time']}")
93 try:
94 prev_ops = await self._load_bundle_ops(last_bundle['bundle_number'])
95 self._seen_cids = self._get_boundary_cids(prev_ops)
96 print(f" Seeded de-duplication set with {len(self._seen_cids)} boundary CIDs.")
97 except FileNotFoundError:
98 print(f" Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.")
99 else:
100 print('Starting from the beginning (genesis bundle).')
101
102 async def run(self):
103 """
104 The main execution loop. It continuously fetches operations, validates and
105 de-duplicates them, fills the mempool, and creates bundles when ready.
106 """
107 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None
108 cursor = last_bundle['end_time'] if last_bundle else None
109
110 while True:
111 try:
112 print(f"\nFetching operations from cursor: {cursor or 'start'}...")
113 fetched_ops = await self._fetch_operations(cursor)
114 if not fetched_ops:
115 print('No more operations available from PLC directory.')
116 break
117
118 self._process_and_validate_ops(fetched_ops)
119 cursor = fetched_ops[-1]['createdAt']
120
121 while len(self._mempool) >= BUNDLE_SIZE:
122 await self._create_and_save_bundle()
123
124 await asyncio.sleep(0.2) # Be nice to the server
125 except httpx.HTTPStatusError as e:
126 print(f"\nError: HTTP {e.response.status_code} - {e.response.text}")
127 break
128 except Exception as e:
129 print(f"\nAn unexpected error occurred: {e}")
130 break
131
132 await self._save_index()
133 print(f"\n---\nProcess complete.")
134 print(f"Total bundles in index: {len(self._index['bundles'])}")
135 print(f"Operations in mempool: {len(self._mempool)}")
136 total_mb = self._index['total_size_bytes'] / 1024 / 1024
137 print(f"Total size: {total_mb:.2f} MB")
138
139 # --- Private Helper Methods ---
140
141 async def _fetch_operations(self, after: str | None) -> list[PLCOperation]:
142 params = {'count': 1000}
143 if after:
144 params['after'] = after
145
146 response = await self._http_client.get(f"{PLC_URL}/export", params=params)
147 response.raise_for_status()
148
149 lines = response.text.strip().split('\n')
150 if not lines or not lines[0]:
151 return []
152
153 # Important: The `_raw` key is added here to preserve the original JSON string,
154 # ensuring byte-for-byte reproducibility as required by Spec 4.2.
155 return [{**json.loads(line), '_raw': line} for line in lines]
156
157 def _process_and_validate_ops(self, ops: list[PLCOperation]):
158 last_op = self._mempool[-1] if self._mempool else None
159 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None
160 last_timestamp = last_op['createdAt'] if last_op else (last_bundle['end_time'] if last_bundle else '')
161
162 new_ops_count = 0
163 for op in ops:
164 if op['cid'] in self._seen_cids:
165 continue
166
167 if op['createdAt'] < last_timestamp:
168 raise ValueError(f"Chronological validation failed: op {op['cid']} at {op['createdAt']} is older than last op at {last_timestamp}")
169
170 self._mempool.append(op)
171 self._seen_cids.add(op['cid'])
172 last_timestamp = op['createdAt']
173 new_ops_count += 1
174 print(f" Added {new_ops_count} new operations to mempool.")
175
176 async def _create_and_save_bundle(self):
177 bundle_ops = self._mempool[:BUNDLE_SIZE]
178 self._mempool = self._mempool[BUNDLE_SIZE:]
179
180 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None
181 parent_hash = last_bundle['hash'] if last_bundle else ''
182
183 # Spec 4.2 & 6.3: Hashing and serialization must be exact.
184 jsonl_data = "".join([op['_raw'] + '\n' for op in bundle_ops]).encode('utf-8')
185 content_hash = hashlib.sha256(jsonl_data).hexdigest()
186 chain_hash = self._calculate_chain_hash(parent_hash, content_hash)
187 compressed_data = zstd.compress(jsonl_data, 3)
188
189 bundle_number = self._index['last_bundle'] + 1
190 filename = f"{bundle_number:06d}.jsonl.zst"
191 (self._bundle_dir / filename).write_bytes(compressed_data)
192
193 self._index['bundles'].append({
194 'bundle_number': bundle_number,
195 'start_time': bundle_ops[0]['createdAt'],
196 'end_time': bundle_ops[-1]['createdAt'],
197 'operation_count': len(bundle_ops),
198 'did_count': len({op['did'] for op in bundle_ops}),
199 'hash': chain_hash, 'content_hash': content_hash, 'parent': parent_hash,
200 'compressed_hash': hashlib.sha256(compressed_data).hexdigest(),
201 'compressed_size': len(compressed_data),
202 'uncompressed_size': len(jsonl_data),
203 'cursor': last_bundle['end_time'] if last_bundle else '',
204 'created_at': datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
205 })
206 self._index['last_bundle'] = bundle_number
207 self._index['total_size_bytes'] += len(compressed_data)
208
209 # Prune `seen_cids` to keep it memory-efficient.
210 new_boundary_cids = self._get_boundary_cids(bundle_ops)
211 mempool_cids = {op['cid'] for op in self._mempool}
212 self._seen_cids = new_boundary_cids.union(mempool_cids)
213
214 await self._save_index()
215 print(f"\nCreating bundle {filename}...")
216 print(f" ✓ Saved. Hash: {chain_hash[:16]}...")
217 print(f" Pruned de-duplication set to {len(self._seen_cids)} CIDs.")
218
219 async def _load_index(self) -> Index:
220 try:
221 return json.loads((self._bundle_dir / INDEX_FILE).read_text())
222 except FileNotFoundError:
223 return {'version': '1.0', 'last_bundle': 0, 'updated_at': '', 'total_size_bytes': 0, 'bundles': []}
224
225 async def _save_index(self):
226 self._index['updated_at'] = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
227 temp_path = self._bundle_dir / f"{INDEX_FILE}.tmp"
228 temp_path.write_text(json.dumps(self._index, indent=2))
229 temp_path.rename(self._bundle_dir / INDEX_FILE)
230
231 async def _load_bundle_ops(self, bundle_number: int) -> list[PLCOperation]:
232 filename = f"{bundle_number:06d}.jsonl.zst"
233 compressed = (self._bundle_dir / filename).read_bytes()
234 decompressed = zstd.decompress(compressed).decode('utf-8')
235 return [{**json.loads(line), '_raw': line} for line in decompressed.strip().split('\n')]
236
237 # --- Static Utilities ---
238
239 @staticmethod
240 def _calculate_chain_hash(parent: str, content_hash: str) -> str:
241 data = f"{parent}:{content_hash}" if parent else f"plcbundle:genesis:{content_hash}"
242 return hashlib.sha256(data.encode('utf-8')).hexdigest()
243
244 @staticmethod
245 def _get_boundary_cids(ops: list[PLCOperation]) -> set[str]:
246 if not ops: return set()
247 last_time = ops[-1]['createdAt']
248 return {op['cid'] for op in reversed(ops) if op['createdAt'] == last_time}
249
250async def main():
251 """Entry point for the script."""
252 dir_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DIR
253 manager = await PlcBundleManager.create(dir_path)
254 await manager.run()
255
256if __name__ == "__main__":
257 try:
258 asyncio.run(main())
259 except KeyboardInterrupt:
260 print("\nProcess interrupted by user.")
261 except Exception as e:
262 print(f"\nFATAL ERROR: {e}", file=sys.stderr)
263 sys.exit(1)
264