PLC Bundle V1 Example Implementations
at main 11 kB view raw
1/** 2 * plcbundle.ts - A compact, readable reference implementation for creating 3 * plcbundle V1 compliant archives. This script demonstrates all critical spec 4 * requirements, including hashing, serialization, ordering, and boundary handling. 5 * 6 * PLC Bundle v1 Specification: 7 * https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md 8 */ 9 10import fs from 'node:fs/promises'; 11import path from 'node:path'; 12import crypto from 'node:crypto'; 13import { init, compress, decompress } from '@bokuweb/zstd-wasm'; 14import axios from 'axios'; 15 16// --- Configuration --- 17const BUNDLE_SIZE = 10000; 18const INDEX_FILE = 'plc_bundles.json'; 19const DEFAULT_DIR = './plc_bundles_ts'; 20const PLC_URL = 'https://plc.directory'; 21 22// --- Types (as per spec) --- 23interface PLCOperation { 24 did: string; 25 cid: string; 26 createdAt: string; 27 operation: Record<string, any>; 28 nullified?: boolean | string; 29 _raw: string; // Holds the original raw JSON string for reproducibility. 30} 31 32interface BundleMetadata { 33 bundle_number: number; 34 start_time: string; 35 end_time: string; 36 operation_count: number; 37 did_count: number; 38 hash: string; // The chain hash. 39 content_hash: string; 40 parent: string; 41 compressed_hash: string; 42 compressed_size: number; 43 uncompressed_size: number; 44 cursor: string; 45 created_at: string; 46} 47 48interface Index { 49 version: string; 50 last_bundle: number; 51 updated_at: string; 52 total_size_bytes: number; 53 bundles: BundleMetadata[]; 54} 55 56// --- ZSTD Initialization --- 57await init(); 58 59/** 60 * Manages the state and process of fetching, validating, and creating PLC bundles. 61 */ 62class PlcBundleManager { 63 private index!: Index; 64 private mempool: PLCOperation[] = []; 65 // This set correctly de-duplicates operations, both from the previous bundle's 66 // boundary and within new batches, and is pruned to stay memory-efficient. 67 private seenCIDs = new Set<string>(); 68 69 private constructor(private bundleDir: string) {} 70 71 /** 72 * Factory to create and asynchronously initialize a PlcBundleManager instance. 73 */ 74 public static async create(bundleDir: string): Promise<PlcBundleManager> { 75 const manager = new PlcBundleManager(bundleDir); 76 await manager.init(); 77 return manager; 78 } 79 80 /** 81 * Initializes the manager by loading the index and seeding the `seenCIDs` 82 * set with the CIDs from the last saved bundle's boundary. 83 */ 84 private async init() { 85 await fs.mkdir(this.bundleDir, { recursive: true }); 86 this.index = await this._loadIndex(); 87 console.log(`plcbundle Reference Implementation\nDirectory: ${this.bundleDir}\n`); 88 89 const lastBundle = this.index.bundles.at(-1); 90 if (lastBundle) { 91 console.log(`Resuming from bundle ${lastBundle.bundle_number + 1}. Last op time: ${lastBundle.end_time}`); 92 try { 93 // Pre-seed the de-duplication set with CIDs from the previous bundle's boundary. 94 const prevOps = await this._loadBundleOps(lastBundle.bundle_number); 95 this.seenCIDs = PlcBundleManager._getBoundaryCIDs(prevOps); 96 console.log(` Seeded de-duplication set with ${this.seenCIDs.size} boundary CIDs.`); 97 } catch (e) { 98 console.warn(` Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.`); 99 } 100 } else { 101 console.log('Starting from the beginning (genesis bundle).'); 102 } 103 } 104 105 /** 106 * The main execution loop. Fetches, validates, de-duplicates, and bundles operations. 107 */ 108 async run() { 109 let cursor = this.index.bundles.at(-1)?.end_time || null; 110 111 while (true) { 112 try { 113 console.log(`\nFetching operations from cursor: ${cursor || 'start'}...`); 114 const fetchedOps = await this._fetchOperations(cursor); 115 if (fetchedOps.length === 0) { 116 console.log('No more operations available.'); 117 break; 118 } 119 120 // The core ingestion logic: de-duplicate and validate operations before adding to the mempool. 121 this._processAndValidateOps(fetchedOps); 122 cursor = fetchedOps.at(-1)!.createdAt; 123 124 // Create bundles as long as the mempool is full. 125 while (this.mempool.length >= BUNDLE_SIZE) { 126 await this._createAndSaveBundle(); 127 } 128 129 await new Promise(resolve => setTimeout(resolve, 200)); // Be nice. 130 } catch (err: any) { 131 console.error(`\nError: ${err.message}`); 132 if (err.response) console.error(`HTTP Status: ${err.response.status}`); 133 if (['ECONNRESET', 'ECONNABORTED'].includes(err.code)) { 134 console.log('Connection error, retrying in 5 seconds...'); 135 await new Promise(resolve => setTimeout(resolve, 5000)); 136 continue; 137 } 138 break; 139 } 140 } 141 142 await this._saveIndex(); 143 console.log(`\n---`); 144 console.log('Process complete.'); 145 console.log(`Total bundles in index: ${this.index.bundles.length}`); 146 console.log(`Operations in mempool: ${this.mempool.length}`); 147 console.log(`Total size: ${(this.index.total_size_bytes / 1024 / 1024).toFixed(2)} MB`); 148 } 149 150 // ========================================================================== 151 // Private Helper Methods 152 // ========================================================================== 153 154 private async _fetchOperations(after: string | null): Promise<PLCOperation[]> { 155 const params = { count: 1000, ...(after && { after }) }; 156 const response = await axios.get<string>(`${PLC_URL}/export`, { params, responseType: 'text' }); 157 const lines = response.data.trimEnd().split('\n'); 158 if (lines.length === 1 && lines[0] === '') return []; 159 return lines.map(line => ({ ...JSON.parse(line), _raw: line })); 160 } 161 162 /** 163 * Processes a batch of fetched operations. It ensures each operation is unique 164 * (both within the batch and across bundle boundaries) and that it maintains 165 * chronological order before adding it to the mempool. 166 */ 167 private _processAndValidateOps(ops: PLCOperation[]) { 168 let lastTimestamp = this.mempool.at(-1)?.createdAt ?? this.index.bundles.at(-1)?.end_time ?? ''; 169 let newOpsCount = 0; 170 171 for (const op of ops) { 172 // This check now correctly handles both boundary dupes and within-batch dupes. 173 if (this.seenCIDs.has(op.cid)) { 174 continue; 175 } 176 177 // Spec 3: Validate that the stream is chronological. 178 if (op.createdAt < lastTimestamp) { 179 throw new Error(`Chronological validation failed: op ${op.cid} at ${op.createdAt} is older than last op at ${lastTimestamp}`); 180 } 181 182 this.mempool.push(op); 183 this.seenCIDs.add(op.cid); // Add the CID to the set only after it's confirmed valid. 184 lastTimestamp = op.createdAt; 185 newOpsCount++; 186 } 187 console.log(` Added ${newOpsCount} new operations to mempool.`); 188 } 189 190 /** 191 * Creates a bundle and prunes the `seenCIDs` set to maintain memory efficiency. 192 */ 193 private async _createAndSaveBundle() { 194 const bundleOps = this.mempool.splice(0, BUNDLE_SIZE); 195 const parentHash = this.index.bundles.at(-1)?.hash ?? ''; 196 197 // Spec 4.2 & 6.3: Hashing and serialization must be exact. 198 const jsonl = PlcBundleManager._serializeJSONL(bundleOps); 199 const contentHash = PlcBundleManager._sha256(Buffer.from(jsonl, 'utf8')); 200 const chainHash = PlcBundleManager._calculateChainHash(parentHash, contentHash); 201 const compressedBuffer = Buffer.from(compress(Buffer.from(jsonl, 'utf8'), 3)); 202 203 const currentBundleNumber = this.index.last_bundle + 1; 204 const filename = `${String(currentBundleNumber).padStart(6, '0')}.jsonl.zst`; 205 await fs.writeFile(path.join(this.bundleDir, filename), compressedBuffer); 206 207 this.index.bundles.push({ 208 bundle_number: currentBundleNumber, 209 start_time: bundleOps[0].createdAt, 210 end_time: bundleOps.at(-1)!.createdAt, 211 operation_count: bundleOps.length, 212 did_count: new Set(bundleOps.map(op => op.did)).size, 213 hash: chainHash, content_hash: contentHash, parent: parentHash, 214 compressed_hash: PlcBundleManager._sha256(compressedBuffer), 215 compressed_size: compressedBuffer.length, 216 uncompressed_size: Buffer.from(jsonl, 'utf8').length, 217 cursor: this.index.bundles.at(-1)?.end_time ?? '', 218 created_at: new Date().toISOString() 219 }); 220 this.index.last_bundle = currentBundleNumber; 221 this.index.total_size_bytes += compressedBuffer.length; 222 223 // Prune the `seenCIDs` set to keep it memory-efficient. It only needs to hold CIDs 224 // from the new boundary and the remaining mempool, not all CIDs ever seen. 225 const newBoundaryCIDs = PlcBundleManager._getBoundaryCIDs(bundleOps); 226 const mempoolCIDs = new Set(this.mempool.map(op => op.cid)); 227 this.seenCIDs = new Set([...newBoundaryCIDs, ...mempoolCIDs]); 228 229 await this._saveIndex(); 230 console.log(`\nCreating bundle ${filename}...`); 231 console.log(` ✓ Saved. Hash: ${chainHash.substring(0, 16)}...`); 232 console.log(` Pruned de-duplication set to ${this.seenCIDs.size} CIDs.`); 233 } 234 235 private async _loadIndex(): Promise<Index> { 236 try { 237 const data = await fs.readFile(path.join(this.bundleDir, INDEX_FILE), 'utf8'); 238 return JSON.parse(data); 239 } catch (err) { 240 return { version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: [] }; 241 } 242 } 243 244 private async _saveIndex(): Promise<void> { 245 this.index.updated_at = new Date().toISOString(); 246 const tempPath = path.join(this.bundleDir, INDEX_FILE + '.tmp'); 247 await fs.writeFile(tempPath, JSON.stringify(this.index, null, 2)); 248 await fs.rename(tempPath, path.join(this.bundleDir, INDEX_FILE)); 249 } 250 251 private async _loadBundleOps(bundleNumber: number): Promise<PLCOperation[]> { 252 const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`; 253 const compressed = await fs.readFile(path.join(this.bundleDir, filename)); 254 const decompressed = Buffer.from(decompress(compressed)).toString('utf8'); 255 return decompressed.trimEnd().split('\n').map(line => ({...JSON.parse(line), _raw: line})); 256 } 257 258 // ========================================================================== 259 // Static Utility Methods 260 // ========================================================================== 261 262 private static _sha256 = (d: string | Buffer): string => crypto.createHash('sha256').update(d).digest('hex'); 263 private static _serializeJSONL = (ops: PLCOperation[]): string => ops.map(op => op._raw + '\n').join(''); 264 private static _calculateChainHash = (p: string, c: string): string => PlcBundleManager._sha256(p ? `${p}:${c}` : `plcbundle:genesis:${c}`); 265 private static _getBoundaryCIDs = (ops: PLCOperation[]): Set<string> => { 266 if (!ops.length) return new Set(); 267 const lastTime = ops.at(-1)!.createdAt; 268 const cids = new Set<string>(); 269 for (let i = ops.length - 1; i >= 0 && ops[i].createdAt === lastTime; i--) cids.add(ops[i].cid); 270 return cids; 271 }; 272} 273 274// --- Entry Point --- 275(async () => { 276 const dir = process.argv[2] || DEFAULT_DIR; 277 const manager = await PlcBundleManager.create(dir); 278 await manager.run(); 279})().catch(err => { 280 console.error('\nFATAL ERROR:', err.message, err.stack); 281 process.exit(1); 282});