tangled
alpha
login
or
join now
atscan.net
/
plcbundle-ref
5
fork
atom
PLC Bundle V1 Example Implementations
5
fork
atom
overview
issues
pulls
pipelines
fix
tree.fail
5 months ago
df6a2020
9b98b1b3
+59
-79
1 changed file
expand all
collapse all
unified
split
plcbundle.ts
+59
-79
plcbundle.ts
reviewed
···
1
1
#!/usr/bin/env node
2
2
3
3
/**
4
4
-
* plcbundle.ts - A reference implementation for fetching PLC directory
5
5
-
* operations and creating verifiable, chained bundles according to the plcbundle V1 spec.
6
6
-
*
7
7
-
* This script fetches operations, validates their order, de-duplicates them,
8
8
-
* and groups them into 10,000-operation bundles. Each bundle is compressed,
9
9
-
* hashed, and cryptographically linked to the previous one, creating a verifiable
10
10
-
* chain of data.
4
4
+
* plcbundle.ts - A compact, readable reference implementation for creating
5
5
+
* plcbundle V1 compliant archives. This script demonstrates all critical spec
6
6
+
* requirements, including hashing, serialization, ordering, and boundary handling.
11
7
*/
12
8
13
9
import fs from 'fs/promises';
···
19
15
// --- Configuration ---
20
16
const BUNDLE_SIZE = 10000;
21
17
const INDEX_FILE = 'plc_bundles.json';
22
22
-
const DEFAULT_DIR = './plc_bundles';
18
18
+
const DEFAULT_DIR = './plc_bundles_ts';
23
19
const PLC_URL = 'https://plc.directory';
24
20
25
21
// --- Types (as per spec) ---
26
22
interface PLCOperation {
23
23
+
did: string;
27
24
cid: string;
28
25
createdAt: string;
26
26
+
operation: Record<string, any>;
27
27
+
nullified?: boolean | string;
29
28
_raw: string; // Holds the original raw JSON string for reproducibility.
30
29
}
31
30
···
60
59
* Manages the state and process of fetching, validating, and creating PLC bundles.
61
60
*/
62
61
class PlcBundleManager {
63
63
-
private bundleDir: string;
64
62
private index!: Index;
65
63
private mempool: PLCOperation[] = [];
66
66
-
// This set is used to de-duplicate operations. It tracks CIDs from the
67
67
-
// previous bundle's boundary and all CIDs from the current mempool.
64
64
+
// This set correctly de-duplicates operations, both from the previous bundle's
65
65
+
// boundary and within new batches, and is pruned to stay memory-efficient.
68
66
private seenCIDs = new Set<string>();
69
67
70
70
-
constructor(bundleDir: string) {
71
71
-
this.bundleDir = bundleDir;
68
68
+
private constructor(private bundleDir: string) {}
69
69
+
70
70
+
/**
71
71
+
* Factory to create and asynchronously initialize a PlcBundleManager instance.
72
72
+
*/
73
73
+
public static async create(bundleDir: string): Promise<PlcBundleManager> {
74
74
+
const manager = new PlcBundleManager(bundleDir);
75
75
+
await manager.init();
76
76
+
return manager;
72
77
}
73
78
74
79
/**
75
80
* Initializes the manager by loading the index and seeding the `seenCIDs`
76
81
* set with the CIDs from the last saved bundle's boundary.
77
82
*/
78
78
-
async init() {
83
83
+
private async init() {
79
84
await fs.mkdir(this.bundleDir, { recursive: true });
80
85
this.index = await this._loadIndex();
81
86
console.log(`plcbundle Reference Implementation\nDirectory: ${this.bundleDir}\n`);
···
85
90
console.log(`Resuming from bundle ${lastBundle.bundle_number + 1}. Last op time: ${lastBundle.end_time}`);
86
91
try {
87
92
// Pre-seed the de-duplication set with CIDs from the previous bundle's boundary.
88
88
-
// This is crucial for preventing duplicates between two adjacent bundles.
89
93
const prevOps = await this._loadBundleOps(lastBundle.bundle_number);
90
90
-
this.seenCIDs = this._getBoundaryCIDs(prevOps);
94
94
+
this.seenCIDs = PlcBundleManager._getBoundaryCIDs(prevOps);
91
95
console.log(` Seeded de-duplication set with ${this.seenCIDs.size} boundary CIDs.`);
92
96
} catch (e) {
93
97
console.warn(` Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.`);
···
98
102
}
99
103
100
104
/**
101
101
-
* The main execution loop. It continuously fetches operations, validates and
102
102
-
* de-duplicates them, fills the mempool, and creates bundles when ready.
105
105
+
* The main execution loop. Fetches, validates, de-duplicates, and bundles operations.
103
106
*/
104
107
async run() {
105
108
let cursor = this.index.bundles.at(-1)?.end_time || null;
···
108
111
try {
109
112
console.log(`\nFetching operations from cursor: ${cursor || 'start'}...`);
110
113
const fetchedOps = await this._fetchOperations(cursor);
111
111
-
112
114
if (fetchedOps.length === 0) {
113
113
-
console.log('No more operations available from PLC directory.');
115
115
+
console.log('No more operations available.');
114
116
break;
115
117
}
116
116
-
118
118
+
117
119
// The core ingestion logic: de-duplicate and validate operations before adding to the mempool.
118
120
this._processAndValidateOps(fetchedOps);
119
119
-
120
120
-
// The cursor for the next fetch is always the timestamp of the last operation received in the current batch.
121
121
-
cursor = fetchedOps[fetchedOps.length - 1].createdAt;
121
121
+
cursor = fetchedOps.at(-1)!.createdAt;
122
122
123
123
-
// If the mempool is full enough, create bundles. This can run multiple times per fetch.
123
123
+
// Create bundles as long as the mempool is full.
124
124
while (this.mempool.length >= BUNDLE_SIZE) {
125
125
await this._createAndSaveBundle();
126
126
}
127
127
128
128
-
await new Promise(resolve => setTimeout(resolve, 200)); // Be nice to the server.
128
128
+
await new Promise(resolve => setTimeout(resolve, 200)); // Be nice.
129
129
} catch (err: any) {
130
130
console.error(`\nError: ${err.message}`);
131
131
if (err.response) console.error(`HTTP Status: ${err.response.status}`);
···
155
155
const response = await axios.get<string>(`${PLC_URL}/export`, { params, responseType: 'text' });
156
156
const lines = response.data.trimEnd().split('\n');
157
157
if (lines.length === 1 && lines[0] === '') return [];
158
158
-
// Important: The `_raw` property is added here to preserve the original JSON string,
159
159
-
// ensuring byte-for-byte reproducibility as required by Spec 4.2.
160
158
return lines.map(line => ({ ...JSON.parse(line), _raw: line }));
161
159
}
162
160
···
166
164
* chronological order before adding it to the mempool.
167
165
*/
168
166
private _processAndValidateOps(ops: PLCOperation[]) {
169
169
-
// The timestamp to validate against is the last operation in the mempool, or if empty,
170
170
-
// the end time of the last bundle. This prevents chronological gaps.
171
171
-
let lastTimestamp = this.mempool.at(-1)?.createdAt || this.index.bundles.at(-1)?.end_time || '';
167
167
+
let lastTimestamp = this.mempool.at(-1)?.createdAt ?? this.index.bundles.at(-1)?.end_time ?? '';
172
168
let newOpsCount = 0;
173
169
174
170
for (const op of ops) {
175
175
-
// The `seenCIDs` set efficiently handles duplicates from the previous bundle's
176
176
-
// boundary as well as any duplicates within the current fetched batch.
171
171
+
// This check now correctly handles both boundary dupes and within-batch dupes.
177
172
if (this.seenCIDs.has(op.cid)) {
178
173
continue;
179
174
}
180
175
181
181
-
// Spec 3: Validate that the stream is chronological. This is a critical sanity check.
176
176
+
// Spec 3: Validate that the stream is chronological.
182
177
if (op.createdAt < lastTimestamp) {
183
178
throw new Error(`Chronological validation failed: op ${op.cid} at ${op.createdAt} is older than last op at ${lastTimestamp}`);
184
179
}
···
192
187
}
193
188
194
189
/**
195
195
-
* Takes 10,000 operations from the mempool, creates a bundle file, generates
196
196
-
* its metadata according to the spec, and updates the index.
190
190
+
* Creates a bundle and prunes the `seenCIDs` set to maintain memory efficiency.
197
191
*/
198
192
private async _createAndSaveBundle() {
199
199
-
const currentBundleNumber = this.index.last_bundle + 1;
200
193
const bundleOps = this.mempool.splice(0, BUNDLE_SIZE);
201
201
-
202
202
-
const parentHash = this.index.bundles.at(-1)?.hash || '';
203
203
-
const previousCursor = this.index.bundles.at(-1)?.end_time || '';
204
204
-
205
205
-
// The hashing and serialization process follows the spec exactly to ensure compatibility.
194
194
+
const parentHash = this.index.bundles.at(-1)?.hash ?? '';
195
195
+
196
196
+
// Spec 4.2 & 6.3: Hashing and serialization must be exact.
206
197
const jsonl = PlcBundleManager._serializeJSONL(bundleOps);
207
207
-
const uncompressedBuffer = Buffer.from(jsonl, 'utf8');
208
208
-
const contentHash = PlcBundleManager._sha256(uncompressedBuffer);
198
198
+
const contentHash = PlcBundleManager._sha256(Buffer.from(jsonl, 'utf8'));
209
199
const chainHash = PlcBundleManager._calculateChainHash(parentHash, contentHash);
210
210
-
const compressedBuffer = Buffer.from(compress(uncompressedBuffer, 3));
200
200
+
const compressedBuffer = Buffer.from(compress(Buffer.from(jsonl, 'utf8'), 3));
211
201
202
202
+
const currentBundleNumber = this.index.last_bundle + 1;
212
203
const filename = `${String(currentBundleNumber).padStart(6, '0')}.jsonl.zst`;
213
204
await fs.writeFile(path.join(this.bundleDir, filename), compressedBuffer);
214
205
215
215
-
const dids = new Set(bundleOps.map(op => JSON.parse(op._raw).did));
216
216
-
const metadata: BundleMetadata = {
206
206
+
this.index.bundles.push({
217
207
bundle_number: currentBundleNumber,
218
208
start_time: bundleOps[0].createdAt,
219
219
-
end_time: bundleOps[bundleOps.length - 1].createdAt,
209
209
+
end_time: bundleOps.at(-1)!.createdAt,
220
210
operation_count: bundleOps.length,
221
221
-
did_count: dids.size,
222
222
-
hash: chainHash,
223
223
-
content_hash: contentHash,
224
224
-
parent: parentHash,
211
211
+
did_count: new Set(bundleOps.map(op => op.did)).size,
212
212
+
hash: chainHash, content_hash: contentHash, parent: parentHash,
225
213
compressed_hash: PlcBundleManager._sha256(compressedBuffer),
226
214
compressed_size: compressedBuffer.length,
227
227
-
uncompressed_size: uncompressedBuffer.length,
228
228
-
cursor: previousCursor,
215
215
+
uncompressed_size: Buffer.from(jsonl, 'utf8').length,
216
216
+
cursor: this.index.bundles.at(-1)?.end_time ?? '',
229
217
created_at: new Date().toISOString()
230
230
-
};
231
231
-
232
232
-
this.index.bundles.push(metadata);
218
218
+
});
233
219
this.index.last_bundle = currentBundleNumber;
234
234
-
this.index.total_size_bytes += metadata.compressed_size;
220
220
+
this.index.total_size_bytes += compressedBuffer.length;
235
221
236
222
// Prune the `seenCIDs` set to keep it memory-efficient. It only needs to hold CIDs
237
223
// from the new boundary and the remaining mempool, not all CIDs ever seen.
238
238
-
const newBoundaryCIDs = this._getBoundaryCIDs(bundleOps);
224
224
+
const newBoundaryCIDs = PlcBundleManager._getBoundaryCIDs(bundleOps);
239
225
const mempoolCIDs = new Set(this.mempool.map(op => op.cid));
240
226
this.seenCIDs = new Set([...newBoundaryCIDs, ...mempoolCIDs]);
241
227
242
228
await this._saveIndex();
243
229
console.log(`\nCreating bundle ${filename}...`);
244
244
-
console.log(` ✓ Saved. Hash: ${metadata.hash.substring(0, 16)}...`);
245
245
-
console.log(` Set new boundary with ${newBoundaryCIDs.size} CIDs. Pruned de-duplication set to ${this.seenCIDs.size} CIDs.`);
230
230
+
console.log(` ✓ Saved. Hash: ${chainHash.substring(0, 16)}...`);
231
231
+
console.log(` Pruned de-duplication set to ${this.seenCIDs.size} CIDs.`);
246
232
}
247
233
248
234
private async _loadIndex(): Promise<Index> {
···
263
249
264
250
private async _loadBundleOps(bundleNumber: number): Promise<PLCOperation[]> {
265
251
const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`;
266
266
-
const filepath = path.join(this.bundleDir, filename);
267
267
-
const compressed = await fs.readFile(filepath);
252
252
+
const compressed = await fs.readFile(path.join(this.bundleDir, filename));
268
253
const decompressed = Buffer.from(decompress(compressed)).toString('utf8');
269
254
return decompressed.trimEnd().split('\n').map(line => ({...JSON.parse(line), _raw: line}));
270
255
}
256
256
+
257
257
+
// ==========================================================================
258
258
+
// Static Utility Methods
259
259
+
// ==========================================================================
271
260
272
272
-
/** Returns CIDs from the last timestamp of a bundle, used for boundary de-duplication. */
273
273
-
private _getBoundaryCIDs(ops: PLCOperation[]): Set<string> {
261
261
+
private static _sha256 = (d: string | Buffer): string => crypto.createHash('sha256').update(d).digest('hex');
262
262
+
private static _serializeJSONL = (ops: PLCOperation[]): string => ops.map(op => op._raw + '\n').join('');
263
263
+
private static _calculateChainHash = (p: string, c: string): string => PlcBundleManager._sha256(p ? `${p}:${c}` : `plcbundle:genesis:${c}`);
264
264
+
private static _getBoundaryCIDs = (ops: PLCOperation[]): Set<string> => {
274
265
if (!ops.length) return new Set();
275
266
const lastTime = ops.at(-1)!.createdAt;
276
267
const cids = new Set<string>();
277
277
-
for (let i = ops.length - 1; i >= 0 && ops[i].createdAt === lastTime; i--) {
278
278
-
cids.add(ops[i].cid);
279
279
-
}
268
268
+
for (let i = ops.length - 1; i >= 0 && ops[i].createdAt === lastTime; i--) cids.add(ops[i].cid);
280
269
return cids;
281
281
-
}
282
282
-
283
283
-
// --- Static Utilities ---
284
284
-
private static _sha256 = (data: string | Buffer): string => crypto.createHash('sha256').update(data).digest('hex');
285
285
-
private static _serializeJSONL = (ops: PLCOperation[]): string => ops.map(op => op._raw + '\n').join('');
286
286
-
private static _calculateChainHash = (parent: string, contentHash: string): string => {
287
287
-
return PlcBundleManager._sha256(parent ? `${parent}:${contentHash}` : `plcbundle:genesis:${contentHash}`);
288
270
};
289
271
}
290
272
291
273
// --- Entry Point ---
292
274
(async () => {
293
275
const dir = process.argv[2] || DEFAULT_DIR;
294
294
-
const manager = new PlcBundleManager(dir);
295
295
-
await manager.init();
276
276
+
const manager = await PlcBundleManager.create(dir);
296
277
await manager.run();
297
278
})().catch(err => {
298
298
-
console.error('\nFATAL ERROR:', err.message);
299
299
-
console.error(err.stack);
279
279
+
console.error('\nFATAL ERROR:', err.message, err.stack);
300
280
process.exit(1);
301
281
});