PLC Bundle V1 Example Implementations
at main 10 kB view raw
1#!/usr/bin/env python3 2 3""" 4plcbundle.py - A compact, readable reference implementation for creating 5plcbundle V1 compliant archives. This script demonstrates all critical spec 6requirements, including hashing, serialization, ordering, and boundary handling. 7 8PLC Bundle v1 Specification: 9 https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md 10""" 11 12import asyncio 13import hashlib 14import json 15import sys 16from datetime import datetime, timezone 17from pathlib import Path 18from typing import TypedDict, Self 19 20import httpx 21import zstd 22 23# --- Configuration --- 24BUNDLE_SIZE = 10000 25INDEX_FILE = 'plc_bundles.json' 26DEFAULT_DIR = './plc_bundles_py' 27PLC_URL = 'https://plc.directory' 28 29# --- Types (as per spec) --- 30class PLCOperation(TypedDict): 31 did: str 32 cid: str 33 createdAt: str 34 operation: dict 35 nullified: bool | str | None 36 _raw: str # Holds the original raw JSON string for reproducibility 37 38class BundleMetadata(TypedDict): 39 bundle_number: int 40 start_time: str 41 end_time: str 42 operation_count: int 43 did_count: int 44 hash: str # The chain hash 45 content_hash: str 46 parent: str 47 compressed_hash: str 48 compressed_size: int 49 uncompressed_size: int 50 cursor: str 51 created_at: str 52 53class Index(TypedDict): 54 version: str 55 last_bundle: int 56 updated_at: str 57 total_size_bytes: int 58 bundles: list[BundleMetadata] 59 60class PlcBundleManager: 61 """ 62 Manages the state and process of fetching, validating, and creating PLC bundles. 63 """ 64 _index: Index 65 _mempool: list[PLCOperation] = [] 66 # This set correctly de-duplicates operations, both from the previous bundle's 67 # boundary and within new batches, and is pruned to stay memory-efficient. 68 _seen_cids = set[str]() 69 70 def __init__(self, bundle_dir: str): 71 self._bundle_dir = Path(bundle_dir) 72 self._http_client = httpx.AsyncClient(timeout=30) 73 74 @classmethod 75 async def create(cls, bundle_dir: str) -> Self: 76 """Factory to create and asynchronously initialize a PlcBundleManager instance.""" 77 manager = cls(bundle_dir) 78 await manager._init() 79 return manager 80 81 async def _init(self): 82 """ 83 Initializes the manager by loading the index and seeding the `seen_cids` 84 set with the CIDs from the last saved bundle's boundary. 85 """ 86 self._bundle_dir.mkdir(exist_ok=True) 87 self._index = await self._load_index() 88 print(f"plcbundle Reference Implementation\nDirectory: {self._bundle_dir}\n") 89 90 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 91 if last_bundle: 92 print(f"Resuming from bundle {last_bundle['bundle_number'] + 1}. Last op time: {last_bundle['end_time']}") 93 try: 94 prev_ops = await self._load_bundle_ops(last_bundle['bundle_number']) 95 self._seen_cids = self._get_boundary_cids(prev_ops) 96 print(f" Seeded de-duplication set with {len(self._seen_cids)} boundary CIDs.") 97 except FileNotFoundError: 98 print(f" Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.") 99 else: 100 print('Starting from the beginning (genesis bundle).') 101 102 async def run(self): 103 """ 104 The main execution loop. It continuously fetches operations, validates and 105 de-duplicates them, fills the mempool, and creates bundles when ready. 106 """ 107 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 108 cursor = last_bundle['end_time'] if last_bundle else None 109 110 while True: 111 try: 112 print(f"\nFetching operations from cursor: {cursor or 'start'}...") 113 fetched_ops = await self._fetch_operations(cursor) 114 if not fetched_ops: 115 print('No more operations available from PLC directory.') 116 break 117 118 self._process_and_validate_ops(fetched_ops) 119 cursor = fetched_ops[-1]['createdAt'] 120 121 while len(self._mempool) >= BUNDLE_SIZE: 122 await self._create_and_save_bundle() 123 124 await asyncio.sleep(0.2) # Be nice to the server 125 except httpx.HTTPStatusError as e: 126 print(f"\nError: HTTP {e.response.status_code} - {e.response.text}") 127 break 128 except Exception as e: 129 print(f"\nAn unexpected error occurred: {e}") 130 break 131 132 await self._save_index() 133 print(f"\n---\nProcess complete.") 134 print(f"Total bundles in index: {len(self._index['bundles'])}") 135 print(f"Operations in mempool: {len(self._mempool)}") 136 total_mb = self._index['total_size_bytes'] / 1024 / 1024 137 print(f"Total size: {total_mb:.2f} MB") 138 139 # --- Private Helper Methods --- 140 141 async def _fetch_operations(self, after: str | None) -> list[PLCOperation]: 142 params = {'count': 1000} 143 if after: 144 params['after'] = after 145 146 response = await self._http_client.get(f"{PLC_URL}/export", params=params) 147 response.raise_for_status() 148 149 lines = response.text.strip().split('\n') 150 if not lines or not lines[0]: 151 return [] 152 153 # Important: The `_raw` key is added here to preserve the original JSON string, 154 # ensuring byte-for-byte reproducibility as required by Spec 4.2. 155 return [{**json.loads(line), '_raw': line} for line in lines] 156 157 def _process_and_validate_ops(self, ops: list[PLCOperation]): 158 last_op = self._mempool[-1] if self._mempool else None 159 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 160 last_timestamp = last_op['createdAt'] if last_op else (last_bundle['end_time'] if last_bundle else '') 161 162 new_ops_count = 0 163 for op in ops: 164 if op['cid'] in self._seen_cids: 165 continue 166 167 if op['createdAt'] < last_timestamp: 168 raise ValueError(f"Chronological validation failed: op {op['cid']} at {op['createdAt']} is older than last op at {last_timestamp}") 169 170 self._mempool.append(op) 171 self._seen_cids.add(op['cid']) 172 last_timestamp = op['createdAt'] 173 new_ops_count += 1 174 print(f" Added {new_ops_count} new operations to mempool.") 175 176 async def _create_and_save_bundle(self): 177 bundle_ops = self._mempool[:BUNDLE_SIZE] 178 self._mempool = self._mempool[BUNDLE_SIZE:] 179 180 last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 181 parent_hash = last_bundle['hash'] if last_bundle else '' 182 183 # Spec 4.2 & 6.3: Hashing and serialization must be exact. 184 jsonl_data = "".join([op['_raw'] + '\n' for op in bundle_ops]).encode('utf-8') 185 content_hash = hashlib.sha256(jsonl_data).hexdigest() 186 chain_hash = self._calculate_chain_hash(parent_hash, content_hash) 187 compressed_data = zstd.compress(jsonl_data, 3) 188 189 bundle_number = self._index['last_bundle'] + 1 190 filename = f"{bundle_number:06d}.jsonl.zst" 191 (self._bundle_dir / filename).write_bytes(compressed_data) 192 193 self._index['bundles'].append({ 194 'bundle_number': bundle_number, 195 'start_time': bundle_ops[0]['createdAt'], 196 'end_time': bundle_ops[-1]['createdAt'], 197 'operation_count': len(bundle_ops), 198 'did_count': len({op['did'] for op in bundle_ops}), 199 'hash': chain_hash, 'content_hash': content_hash, 'parent': parent_hash, 200 'compressed_hash': hashlib.sha256(compressed_data).hexdigest(), 201 'compressed_size': len(compressed_data), 202 'uncompressed_size': len(jsonl_data), 203 'cursor': last_bundle['end_time'] if last_bundle else '', 204 'created_at': datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') 205 }) 206 self._index['last_bundle'] = bundle_number 207 self._index['total_size_bytes'] += len(compressed_data) 208 209 # Prune `seen_cids` to keep it memory-efficient. 210 new_boundary_cids = self._get_boundary_cids(bundle_ops) 211 mempool_cids = {op['cid'] for op in self._mempool} 212 self._seen_cids = new_boundary_cids.union(mempool_cids) 213 214 await self._save_index() 215 print(f"\nCreating bundle {filename}...") 216 print(f" ✓ Saved. Hash: {chain_hash[:16]}...") 217 print(f" Pruned de-duplication set to {len(self._seen_cids)} CIDs.") 218 219 async def _load_index(self) -> Index: 220 try: 221 return json.loads((self._bundle_dir / INDEX_FILE).read_text()) 222 except FileNotFoundError: 223 return {'version': '1.0', 'last_bundle': 0, 'updated_at': '', 'total_size_bytes': 0, 'bundles': []} 224 225 async def _save_index(self): 226 self._index['updated_at'] = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') 227 temp_path = self._bundle_dir / f"{INDEX_FILE}.tmp" 228 temp_path.write_text(json.dumps(self._index, indent=2)) 229 temp_path.rename(self._bundle_dir / INDEX_FILE) 230 231 async def _load_bundle_ops(self, bundle_number: int) -> list[PLCOperation]: 232 filename = f"{bundle_number:06d}.jsonl.zst" 233 compressed = (self._bundle_dir / filename).read_bytes() 234 decompressed = zstd.decompress(compressed).decode('utf-8') 235 return [{**json.loads(line), '_raw': line} for line in decompressed.strip().split('\n')] 236 237 # --- Static Utilities --- 238 239 @staticmethod 240 def _calculate_chain_hash(parent: str, content_hash: str) -> str: 241 data = f"{parent}:{content_hash}" if parent else f"plcbundle:genesis:{content_hash}" 242 return hashlib.sha256(data.encode('utf-8')).hexdigest() 243 244 @staticmethod 245 def _get_boundary_cids(ops: list[PLCOperation]) -> set[str]: 246 if not ops: return set() 247 last_time = ops[-1]['createdAt'] 248 return {op['cid'] for op in reversed(ops) if op['createdAt'] == last_time} 249 250async def main(): 251 """Entry point for the script.""" 252 dir_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DIR 253 manager = await PlcBundleManager.create(dir_path) 254 await manager.run() 255 256if __name__ == "__main__": 257 try: 258 asyncio.run(main()) 259 except KeyboardInterrupt: 260 print("\nProcess interrupted by user.") 261 except Exception as e: 262 print(f"\nFATAL ERROR: {e}", file=sys.stderr) 263 sys.exit(1) 264