fork
Configure Feed
Select the types of activity you want to include in your feed.
PLC Bundle V1 Example Implementations
fork
Configure Feed
Select the types of activity you want to include in your feed.
1/**
2 * plcbundle.ts - A compact, readable reference implementation for creating
3 * plcbundle V1 compliant archives. This script demonstrates all critical spec
4 * requirements, including hashing, serialization, ordering, and boundary handling.
5 *
6 * PLC Bundle v1 Specification:
7 * https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md
8 */
9
10import fs from 'node:fs/promises';
11import path from 'node:path';
12import crypto from 'node:crypto';
13import { init, compress, decompress } from '@bokuweb/zstd-wasm';
14import axios from 'axios';
15
16// --- Configuration ---
17const BUNDLE_SIZE = 10000;
18const INDEX_FILE = 'plc_bundles.json';
19const DEFAULT_DIR = './plc_bundles_ts';
20const PLC_URL = 'https://plc.directory';
21
22// --- Types (as per spec) ---
23interface PLCOperation {
24 did: string;
25 cid: string;
26 createdAt: string;
27 operation: Record<string, any>;
28 nullified?: boolean | string;
29 _raw: string; // Holds the original raw JSON string for reproducibility.
30}
31
32interface BundleMetadata {
33 bundle_number: number;
34 start_time: string;
35 end_time: string;
36 operation_count: number;
37 did_count: number;
38 hash: string; // The chain hash.
39 content_hash: string;
40 parent: string;
41 compressed_hash: string;
42 compressed_size: number;
43 uncompressed_size: number;
44 cursor: string;
45 created_at: string;
46}
47
48interface Index {
49 version: string;
50 last_bundle: number;
51 updated_at: string;
52 total_size_bytes: number;
53 bundles: BundleMetadata[];
54}
55
56// --- ZSTD Initialization ---
57await init();
58
59/**
60 * Manages the state and process of fetching, validating, and creating PLC bundles.
61 */
62class PlcBundleManager {
63 private index!: Index;
64 private mempool: PLCOperation[] = [];
65 // This set correctly de-duplicates operations, both from the previous bundle's
66 // boundary and within new batches, and is pruned to stay memory-efficient.
67 private seenCIDs = new Set<string>();
68
69 private constructor(private bundleDir: string) {}
70
71 /**
72 * Factory to create and asynchronously initialize a PlcBundleManager instance.
73 */
74 public static async create(bundleDir: string): Promise<PlcBundleManager> {
75 const manager = new PlcBundleManager(bundleDir);
76 await manager.init();
77 return manager;
78 }
79
80 /**
81 * Initializes the manager by loading the index and seeding the `seenCIDs`
82 * set with the CIDs from the last saved bundle's boundary.
83 */
84 private async init() {
85 await fs.mkdir(this.bundleDir, { recursive: true });
86 this.index = await this._loadIndex();
87 console.log(`plcbundle Reference Implementation\nDirectory: ${this.bundleDir}\n`);
88
89 const lastBundle = this.index.bundles.at(-1);
90 if (lastBundle) {
91 console.log(`Resuming from bundle ${lastBundle.bundle_number + 1}. Last op time: ${lastBundle.end_time}`);
92 try {
93 // Pre-seed the de-duplication set with CIDs from the previous bundle's boundary.
94 const prevOps = await this._loadBundleOps(lastBundle.bundle_number);
95 this.seenCIDs = PlcBundleManager._getBoundaryCIDs(prevOps);
96 console.log(` Seeded de-duplication set with ${this.seenCIDs.size} boundary CIDs.`);
97 } catch (e) {
98 console.warn(` Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.`);
99 }
100 } else {
101 console.log('Starting from the beginning (genesis bundle).');
102 }
103 }
104
105 /**
106 * The main execution loop. Fetches, validates, de-duplicates, and bundles operations.
107 */
108 async run() {
109 let cursor = this.index.bundles.at(-1)?.end_time || null;
110
111 while (true) {
112 try {
113 console.log(`\nFetching operations from cursor: ${cursor || 'start'}...`);
114 const fetchedOps = await this._fetchOperations(cursor);
115 if (fetchedOps.length === 0) {
116 console.log('No more operations available.');
117 break;
118 }
119
120 // The core ingestion logic: de-duplicate and validate operations before adding to the mempool.
121 this._processAndValidateOps(fetchedOps);
122 cursor = fetchedOps.at(-1)!.createdAt;
123
124 // Create bundles as long as the mempool is full.
125 while (this.mempool.length >= BUNDLE_SIZE) {
126 await this._createAndSaveBundle();
127 }
128
129 await new Promise(resolve => setTimeout(resolve, 200)); // Be nice.
130 } catch (err: any) {
131 console.error(`\nError: ${err.message}`);
132 if (err.response) console.error(`HTTP Status: ${err.response.status}`);
133 if (['ECONNRESET', 'ECONNABORTED'].includes(err.code)) {
134 console.log('Connection error, retrying in 5 seconds...');
135 await new Promise(resolve => setTimeout(resolve, 5000));
136 continue;
137 }
138 break;
139 }
140 }
141
142 await this._saveIndex();
143 console.log(`\n---`);
144 console.log('Process complete.');
145 console.log(`Total bundles in index: ${this.index.bundles.length}`);
146 console.log(`Operations in mempool: ${this.mempool.length}`);
147 console.log(`Total size: ${(this.index.total_size_bytes / 1024 / 1024).toFixed(2)} MB`);
148 }
149
150 // ==========================================================================
151 // Private Helper Methods
152 // ==========================================================================
153
154 private async _fetchOperations(after: string | null): Promise<PLCOperation[]> {
155 const params = { count: 1000, ...(after && { after }) };
156 const response = await axios.get<string>(`${PLC_URL}/export`, { params, responseType: 'text' });
157 const lines = response.data.trimEnd().split('\n');
158 if (lines.length === 1 && lines[0] === '') return [];
159 return lines.map(line => ({ ...JSON.parse(line), _raw: line }));
160 }
161
162 /**
163 * Processes a batch of fetched operations. It ensures each operation is unique
164 * (both within the batch and across bundle boundaries) and that it maintains
165 * chronological order before adding it to the mempool.
166 */
167 private _processAndValidateOps(ops: PLCOperation[]) {
168 let lastTimestamp = this.mempool.at(-1)?.createdAt ?? this.index.bundles.at(-1)?.end_time ?? '';
169 let newOpsCount = 0;
170
171 for (const op of ops) {
172 // This check now correctly handles both boundary dupes and within-batch dupes.
173 if (this.seenCIDs.has(op.cid)) {
174 continue;
175 }
176
177 // Spec 3: Validate that the stream is chronological.
178 if (op.createdAt < lastTimestamp) {
179 throw new Error(`Chronological validation failed: op ${op.cid} at ${op.createdAt} is older than last op at ${lastTimestamp}`);
180 }
181
182 this.mempool.push(op);
183 this.seenCIDs.add(op.cid); // Add the CID to the set only after it's confirmed valid.
184 lastTimestamp = op.createdAt;
185 newOpsCount++;
186 }
187 console.log(` Added ${newOpsCount} new operations to mempool.`);
188 }
189
190 /**
191 * Creates a bundle and prunes the `seenCIDs` set to maintain memory efficiency.
192 */
193 private async _createAndSaveBundle() {
194 const bundleOps = this.mempool.splice(0, BUNDLE_SIZE);
195 const parentHash = this.index.bundles.at(-1)?.hash ?? '';
196
197 // Spec 4.2 & 6.3: Hashing and serialization must be exact.
198 const jsonl = PlcBundleManager._serializeJSONL(bundleOps);
199 const contentHash = PlcBundleManager._sha256(Buffer.from(jsonl, 'utf8'));
200 const chainHash = PlcBundleManager._calculateChainHash(parentHash, contentHash);
201 const compressedBuffer = Buffer.from(compress(Buffer.from(jsonl, 'utf8'), 3));
202
203 const currentBundleNumber = this.index.last_bundle + 1;
204 const filename = `${String(currentBundleNumber).padStart(6, '0')}.jsonl.zst`;
205 await fs.writeFile(path.join(this.bundleDir, filename), compressedBuffer);
206
207 this.index.bundles.push({
208 bundle_number: currentBundleNumber,
209 start_time: bundleOps[0].createdAt,
210 end_time: bundleOps.at(-1)!.createdAt,
211 operation_count: bundleOps.length,
212 did_count: new Set(bundleOps.map(op => op.did)).size,
213 hash: chainHash, content_hash: contentHash, parent: parentHash,
214 compressed_hash: PlcBundleManager._sha256(compressedBuffer),
215 compressed_size: compressedBuffer.length,
216 uncompressed_size: Buffer.from(jsonl, 'utf8').length,
217 cursor: this.index.bundles.at(-1)?.end_time ?? '',
218 created_at: new Date().toISOString()
219 });
220 this.index.last_bundle = currentBundleNumber;
221 this.index.total_size_bytes += compressedBuffer.length;
222
223 // Prune the `seenCIDs` set to keep it memory-efficient. It only needs to hold CIDs
224 // from the new boundary and the remaining mempool, not all CIDs ever seen.
225 const newBoundaryCIDs = PlcBundleManager._getBoundaryCIDs(bundleOps);
226 const mempoolCIDs = new Set(this.mempool.map(op => op.cid));
227 this.seenCIDs = new Set([...newBoundaryCIDs, ...mempoolCIDs]);
228
229 await this._saveIndex();
230 console.log(`\nCreating bundle ${filename}...`);
231 console.log(` ✓ Saved. Hash: ${chainHash.substring(0, 16)}...`);
232 console.log(` Pruned de-duplication set to ${this.seenCIDs.size} CIDs.`);
233 }
234
235 private async _loadIndex(): Promise<Index> {
236 try {
237 const data = await fs.readFile(path.join(this.bundleDir, INDEX_FILE), 'utf8');
238 return JSON.parse(data);
239 } catch (err) {
240 return { version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: [] };
241 }
242 }
243
244 private async _saveIndex(): Promise<void> {
245 this.index.updated_at = new Date().toISOString();
246 const tempPath = path.join(this.bundleDir, INDEX_FILE + '.tmp');
247 await fs.writeFile(tempPath, JSON.stringify(this.index, null, 2));
248 await fs.rename(tempPath, path.join(this.bundleDir, INDEX_FILE));
249 }
250
251 private async _loadBundleOps(bundleNumber: number): Promise<PLCOperation[]> {
252 const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`;
253 const compressed = await fs.readFile(path.join(this.bundleDir, filename));
254 const decompressed = Buffer.from(decompress(compressed)).toString('utf8');
255 return decompressed.trimEnd().split('\n').map(line => ({...JSON.parse(line), _raw: line}));
256 }
257
258 // ==========================================================================
259 // Static Utility Methods
260 // ==========================================================================
261
262 private static _sha256 = (d: string | Buffer): string => crypto.createHash('sha256').update(d).digest('hex');
263 private static _serializeJSONL = (ops: PLCOperation[]): string => ops.map(op => op._raw + '\n').join('');
264 private static _calculateChainHash = (p: string, c: string): string => PlcBundleManager._sha256(p ? `${p}:${c}` : `plcbundle:genesis:${c}`);
265 private static _getBoundaryCIDs = (ops: PLCOperation[]): Set<string> => {
266 if (!ops.length) return new Set();
267 const lastTime = ops.at(-1)!.createdAt;
268 const cids = new Set<string>();
269 for (let i = ops.length - 1; i >= 0 && ops[i].createdAt === lastTime; i--) cids.add(ops[i].cid);
270 return cids;
271 };
272}
273
274// --- Entry Point ---
275(async () => {
276 const dir = process.argv[2] || DEFAULT_DIR;
277 const manager = await PlcBundleManager.create(dir);
278 await manager.run();
279})().catch(err => {
280 console.error('\nFATAL ERROR:', err.message, err.stack);
281 process.exit(1);
282});