PLC Bundle V1 Example Implementations
1/**
2 * plcbundle.ts - A compact, readable reference implementation for creating
3 * plcbundle V1 compliant archives. This script demonstrates all critical spec
4 * requirements, including hashing, serialization, ordering, and boundary handling.
5 *
6 * PLC Bundle v1 Specification:
7 * https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md
8 */
9
10import fs from 'node:fs/promises';
11import path from 'node:path';
12import crypto from 'node:crypto';
13import { init, compress, decompress } from '@bokuweb/zstd-wasm';
14import axios from 'axios';
15
16// --- Configuration ---
17const BUNDLE_SIZE = 10000;
18const INDEX_FILE = 'plc_bundles.json';
19const DEFAULT_DIR = './plc_bundles_ts';
20const PLC_URL = 'https://plc.directory';
21
22// --- Types (as per spec) ---
23interface PLCOperation {
24 did: string;
25 cid: string;
26 createdAt: string;
27 operation: Record<string, any>;
28 nullified?: boolean | string;
29 _raw: string; // Holds the original raw JSON string for reproducibility.
30}
31
32interface BundleMetadata {
33 bundle_number: number;
34 start_time: string;
35 end_time: string;
36 operation_count: number;
37 did_count: number;
38 hash: string; // The chain hash.
39 content_hash: string;
40 parent: string;
41 compressed_hash: string;
42 compressed_size: number;
43 uncompressed_size: number;
44 cursor: string;
45 created_at: string;
46}
47
48interface Index {
49 version: string;
50 last_bundle: number;
51 updated_at: string;
52 total_size_bytes: number;
53 bundles: BundleMetadata[];
54}
55
56// --- ZSTD Initialization ---
57await init();
58
59/**
60 * Manages the state and process of fetching, validating, and creating PLC bundles.
61 */
62class PlcBundleManager {
63 private index!: Index;
64 private mempool: PLCOperation[] = [];
65 // This set correctly de-duplicates operations, both from the previous bundle's
66 // boundary and within new batches, and is pruned to stay memory-efficient.
67 private seenCIDs = new Set<string>();
68
69 private constructor(private bundleDir: string) {}
70
71 /**
72 * Factory to create and asynchronously initialize a PlcBundleManager instance.
73 */
74 public static async create(bundleDir: string): Promise<PlcBundleManager> {
75 const manager = new PlcBundleManager(bundleDir);
76 await manager.init();
77 return manager;
78 }
79
80 /**
81 * Initializes the manager by loading the index and seeding the `seenCIDs`
82 * set with the CIDs from the last saved bundle's boundary.
83 */
84 private async init() {
85 await fs.mkdir(this.bundleDir, { recursive: true });
86 this.index = await this._loadIndex();
87 console.log(`plcbundle Reference Implementation\nDirectory: ${this.bundleDir}\n`);
88
89 const lastBundle = this.index.bundles.at(-1);
90 if (lastBundle) {
91 console.log(`Resuming from bundle ${lastBundle.bundle_number + 1}. Last op time: ${lastBundle.end_time}`);
92 try {
93 // Pre-seed the de-duplication set with CIDs from the previous bundle's boundary.
94 const prevOps = await this._loadBundleOps(lastBundle.bundle_number);
95 this.seenCIDs = PlcBundleManager._getBoundaryCIDs(prevOps);
96 console.log(` Seeded de-duplication set with ${this.seenCIDs.size} boundary CIDs.`);
97 } catch (e) {
98 console.warn(` Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.`);
99 }
100 } else {
101 console.log('Starting from the beginning (genesis bundle).');
102 }
103 }
104
105 /**
106 * The main execution loop. Fetches, validates, de-duplicates, and bundles operations.
107 */
108 async run() {
109 let cursor = this.index.bundles.at(-1)?.end_time || null;
110
111 while (true) {
112 try {
113 console.log(`\nFetching operations from cursor: ${cursor || 'start'}...`);
114 const fetchedOps = await this._fetchOperations(cursor);
115 if (fetchedOps.length === 0) {
116 console.log('No more operations available.');
117 break;
118 }
119
120 // The core ingestion logic: de-duplicate and validate operations before adding to the mempool.
121 this._processAndValidateOps(fetchedOps);
122 cursor = fetchedOps.at(-1)!.createdAt;
123
124 // Create bundles as long as the mempool is full.
125 while (this.mempool.length >= BUNDLE_SIZE) {
126 await this._createAndSaveBundle();
127 }
128
129 await new Promise(resolve => setTimeout(resolve, 200)); // Be nice.
130 } catch (err: any) {
131 console.error(`\nError: ${err.message}`);
132 if (err.response) console.error(`HTTP Status: ${err.response.status}`);
133 if (['ECONNRESET', 'ECONNABORTED'].includes(err.code)) {
134 console.log('Connection error, retrying in 5 seconds...');
135 await new Promise(resolve => setTimeout(resolve, 5000));
136 continue;
137 }
138 break;
139 }
140 }
141
142 await this._saveIndex();
143 console.log(`\n---`);
144 console.log('Process complete.');
145 console.log(`Total bundles in index: ${this.index.bundles.length}`);
146 console.log(`Operations in mempool: ${this.mempool.length}`);
147 console.log(`Total size: ${(this.index.total_size_bytes / 1024 / 1024).toFixed(2)} MB`);
148 }
149
150 // ==========================================================================
151 // Private Helper Methods
152 // ==========================================================================
153
154 private async _fetchOperations(after: string | null): Promise<PLCOperation[]> {
155 const params = { count: 1000, ...(after && { after }) };
156 const response = await axios.get<string>(`${PLC_URL}/export`, { params, responseType: 'text' });
157 const lines = response.data.trimEnd().split('\n');
158 if (lines.length === 1 && lines[0] === '') return [];
159 return lines.map(line => ({ ...JSON.parse(line), _raw: line }));
160 }
161
162 /**
163 * Processes a batch of fetched operations. It ensures each operation is unique
164 * (both within the batch and across bundle boundaries) and that it maintains
165 * chronological order before adding it to the mempool.
166 */
167 private _processAndValidateOps(ops: PLCOperation[]) {
168 let lastTimestamp = this.mempool.at(-1)?.createdAt ?? this.index.bundles.at(-1)?.end_time ?? '';
169 let newOpsCount = 0;
170
171 for (const op of ops) {
172 // This check now correctly handles both boundary dupes and within-batch dupes.
173 if (this.seenCIDs.has(op.cid)) {
174 continue;
175 }
176
177 // Spec 3: Validate that the stream is chronological.
178 if (op.createdAt < lastTimestamp) {
179 throw new Error(`Chronological validation failed: op ${op.cid} at ${op.createdAt} is older than last op at ${lastTimestamp}`);
180 }
181
182 this.mempool.push(op);
183 this.seenCIDs.add(op.cid); // Add the CID to the set only after it's confirmed valid.
184 lastTimestamp = op.createdAt;
185 newOpsCount++;
186 }
187 console.log(` Added ${newOpsCount} new operations to mempool.`);
188 }
189
190 /**
191 * Creates a bundle and prunes the `seenCIDs` set to maintain memory efficiency.
192 */
193 private async _createAndSaveBundle() {
194 const bundleOps = this.mempool.splice(0, BUNDLE_SIZE);
195 const parentHash = this.index.bundles.at(-1)?.hash ?? '';
196
197 // Spec 4.2 & 6.3: Hashing and serialization must be exact.
198 const jsonl = PlcBundleManager._serializeJSONL(bundleOps);
199 const contentHash = PlcBundleManager._sha256(Buffer.from(jsonl, 'utf8'));
200 const chainHash = PlcBundleManager._calculateChainHash(parentHash, contentHash);
201 const compressedBuffer = Buffer.from(compress(Buffer.from(jsonl, 'utf8'), 3));
202
203 const currentBundleNumber = this.index.last_bundle + 1;
204 const filename = `${String(currentBundleNumber).padStart(6, '0')}.jsonl.zst`;
205 await fs.writeFile(path.join(this.bundleDir, filename), compressedBuffer);
206
207 this.index.bundles.push({
208 bundle_number: currentBundleNumber,
209 start_time: bundleOps[0].createdAt,
210 end_time: bundleOps.at(-1)!.createdAt,
211 operation_count: bundleOps.length,
212 did_count: new Set(bundleOps.map(op => op.did)).size,
213 hash: chainHash, content_hash: contentHash, parent: parentHash,
214 compressed_hash: PlcBundleManager._sha256(compressedBuffer),
215 compressed_size: compressedBuffer.length,
216 uncompressed_size: Buffer.from(jsonl, 'utf8').length,
217 cursor: this.index.bundles.at(-1)?.end_time ?? '',
218 created_at: new Date().toISOString()
219 });
220 this.index.last_bundle = currentBundleNumber;
221 this.index.total_size_bytes += compressedBuffer.length;
222
223 // Prune the `seenCIDs` set to keep it memory-efficient. It only needs to hold CIDs
224 // from the new boundary and the remaining mempool, not all CIDs ever seen.
225 const newBoundaryCIDs = PlcBundleManager._getBoundaryCIDs(bundleOps);
226 const mempoolCIDs = new Set(this.mempool.map(op => op.cid));
227 this.seenCIDs = new Set([...newBoundaryCIDs, ...mempoolCIDs]);
228
229 await this._saveIndex();
230 console.log(`\nCreating bundle ${filename}...`);
231 console.log(` ✓ Saved. Hash: ${chainHash.substring(0, 16)}...`);
232 console.log(` Pruned de-duplication set to ${this.seenCIDs.size} CIDs.`);
233 }
234
235 private async _loadIndex(): Promise<Index> {
236 try {
237 const data = await fs.readFile(path.join(this.bundleDir, INDEX_FILE), 'utf8');
238 return JSON.parse(data);
239 } catch (err) {
240 return { version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: [] };
241 }
242 }
243
244 private async _saveIndex(): Promise<void> {
245 this.index.updated_at = new Date().toISOString();
246 const tempPath = path.join(this.bundleDir, INDEX_FILE + '.tmp');
247 await fs.writeFile(tempPath, JSON.stringify(this.index, null, 2));
248 await fs.rename(tempPath, path.join(this.bundleDir, INDEX_FILE));
249 }
250
251 private async _loadBundleOps(bundleNumber: number): Promise<PLCOperation[]> {
252 const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`;
253 const compressed = await fs.readFile(path.join(this.bundleDir, filename));
254 const decompressed = Buffer.from(decompress(compressed)).toString('utf8');
255 return decompressed.trimEnd().split('\n').map(line => ({...JSON.parse(line), _raw: line}));
256 }
257
258 // ==========================================================================
259 // Static Utility Methods
260 // ==========================================================================
261
262 private static _sha256 = (d: string | Buffer): string => crypto.createHash('sha256').update(d).digest('hex');
263 private static _serializeJSONL = (ops: PLCOperation[]): string => ops.map(op => op._raw + '\n').join('');
264 private static _calculateChainHash = (p: string, c: string): string => PlcBundleManager._sha256(p ? `${p}:${c}` : `plcbundle:genesis:${c}`);
265 private static _getBoundaryCIDs = (ops: PLCOperation[]): Set<string> => {
266 if (!ops.length) return new Set();
267 const lastTime = ops.at(-1)!.createdAt;
268 const cids = new Set<string>();
269 for (let i = ops.length - 1; i >= 0 && ops[i].createdAt === lastTime; i--) cids.add(ops[i].cid);
270 return cids;
271 };
272}
273
274// --- Entry Point ---
275(async () => {
276 const dir = process.argv[2] || DEFAULT_DIR;
277 const manager = await PlcBundleManager.create(dir);
278 await manager.run();
279})().catch(err => {
280 console.error('\nFATAL ERROR:', err.message, err.stack);
281 process.exit(1);
282});