⚡ Zero-dependency plcbundle library exclusively for Bun
at main 32 kB view raw
1import type { 2 BundleIndex, 3 BundleMetadata, 4 Operation, 5 ProcessCallback, 6 ProcessOptions, 7 ProcessStats, 8 CloneOptions, 9 CloneStats, 10 ChainVerificationResult 11} from './types'; 12 13/** 14 * Calculate SHA-256 hash using Bun's native hasher 15 */ 16function sha256(data: Uint8Array | string): string { 17 const hasher = new Bun.CryptoHasher("sha256"); 18 hasher.update(data); 19 return Buffer.from(hasher.digest()).toString('hex'); 20} 21 22/** 23 * Main class for reading and processing PLC bundle archives. 24 * 25 * This class provides methods for: 26 * - Cloning bundles from remote repositories 27 * - Reading and verifying local bundles 28 * - Streaming operations from bundles 29 * - Processing bundles with custom callbacks 30 * 31 * All operations use Bun's native features for optimal performance. 32 * 33 * @example Basic usage 34 * ```ts 35 * const bundle = new PLCBundle('./my-bundles'); 36 * 37 * // Get repository information 38 * const stats = await bundle.getStats(); 39 * console.log(`Repository has ${stats.lastBundle} bundles`); 40 * 41 * // Stream operations from a bundle 42 * for await (const op of bundle.streamOperations(1)) { 43 * console.log(op.did); 44 * } 45 * ``` 46 * 47 * @example Clone from remote 48 * ```ts 49 * const bundle = new PLCBundle('./bundles'); 50 * 51 * await bundle.clone('https://plcbundle.atscan.net', { 52 * bundles: '1-100', 53 * threads: 8, 54 * verify: true, 55 * onProgress: (stats) => { 56 * console.log(`${stats.downloadedBundles}/${stats.totalBundles}`); 57 * } 58 * }); 59 * ``` 60 * 61 * @example Process with callback 62 * ```ts 63 * await bundle.processBundles(1, 10, (op, pos, num) => { 64 * if (op.did.startsWith('did:plc:')) { 65 * console.log(`Found DID: ${op.did}`); 66 * } 67 * }, { 68 * threads: 4, 69 * onProgress: (stats) => console.log(`${stats.totalOps} ops`) 70 * }); 71 * ``` 72 */ 73export class PLCBundle { 74 private dir: string; 75 private indexPath: string; 76 private cachedIndex?: BundleIndex; 77 78 /** 79 * Create a new PLCBundle instance. 80 * 81 * @param dir - Directory containing bundle files (default: './') 82 * @param indexPath - Path to the index file (default: `${dir}/plc_bundles.json`) 83 * 84 * @example 85 * ```ts 86 * // Use default directory 87 * const bundle1 = new PLCBundle(); 88 * 89 * // Specify custom directory 90 * const bundle2 = new PLCBundle('./my-bundles'); 91 * 92 * // Custom directory and index path 93 * const bundle3 = new PLCBundle('./bundles', './custom-index.json'); 94 * ``` 95 */ 96 constructor(dir: string = './', indexPath?: string) { 97 this.dir = dir.endsWith('/') ? dir : `${dir}/`; 98 this.indexPath = indexPath || `${this.dir}plc_bundles.json`; 99 } 100 101 /** 102 * Load the bundle index from disk. 103 * 104 * The index is cached in memory after first load. Use `refresh` parameter 105 * to force reloading from disk. 106 * 107 * @param refresh - If true, reload from disk even if cached (default: false) 108 * @returns Promise resolving to the bundle index 109 * @throws Error if index file cannot be read or parsed 110 * 111 * @example 112 * ```ts 113 * // Load index (uses cache if available) 114 * const index = await bundle.loadIndex(); 115 * 116 * // Force reload from disk 117 * const freshIndex = await bundle.loadIndex(true); 118 * ``` 119 */ 120 async loadIndex(refresh = false): Promise<BundleIndex> { 121 if (!refresh && this.cachedIndex) { 122 return this.cachedIndex; 123 } 124 125 const file = Bun.file(this.indexPath); 126 this.cachedIndex = await file.json(); 127 return this.cachedIndex!; 128 } 129 130 /** 131 * Save a bundle index to disk. 132 * 133 * Writes the index as formatted JSON and updates the in-memory cache. 134 * The index is written atomically using Bun's file API. 135 * 136 * @param index - The bundle index to save 137 * @returns Promise that resolves when save is complete 138 * 139 * @example 140 * ```ts 141 * const index = await bundle.loadIndex(); 142 * index.last_bundle = 150; 143 * await bundle.saveIndex(index); 144 * ``` 145 */ 146 async saveIndex(index: BundleIndex): Promise<void> { 147 await Bun.write(this.indexPath, JSON.stringify(index, null, 2)); 148 this.cachedIndex = index; 149 } 150 151 /** 152 * Get metadata for a specific bundle. 153 * 154 * @param bundleNum - The bundle number to retrieve metadata for 155 * @returns Promise resolving to bundle metadata, or undefined if not found 156 * 157 * @example 158 * ```ts 159 * const metadata = await bundle.getMetadata(42); 160 * if (metadata) { 161 * console.log(`Bundle ${metadata.bundle_number} has ${metadata.operation_count} operations`); 162 * } 163 * ``` 164 */ 165 async getMetadata(bundleNum: number): Promise<BundleMetadata | undefined> { 166 const index = await this.loadIndex(); 167 return index.bundles.find(b => b.bundle_number === bundleNum); 168 } 169 170 /** 171 * Get the file path for a specific bundle. 172 * 173 * Bundles are named with zero-padded 6-digit numbers (e.g., `000042.jsonl.zst`). 174 * 175 * @param bundleNum - The bundle number 176 * @returns Full path to the bundle file 177 * 178 * @example 179 * ```ts 180 * const path = bundle.getBundlePath(42); 181 * // Returns: "./bundles/000042.jsonl.zst" 182 * ``` 183 */ 184 getBundlePath(bundleNum: number): string { 185 return `${this.dir}${bundleNum.toString().padStart(6, '0')}.jsonl.zst`; 186 } 187 188 /** 189 * Read and decompress a bundle file. 190 * 191 * Uses Bun's native zstd decompression for optimal performance. 192 * 193 * @param bundleNum - The bundle number to read 194 * @returns Promise resolving to the decompressed JSONL content as a string 195 * @throws Error if bundle file cannot be read or decompressed 196 * 197 * @example 198 * ```ts 199 * const jsonl = await bundle.readBundle(1); 200 * console.log(`Bundle size: ${jsonl.length} bytes`); 201 * ``` 202 */ 203 async readBundle(bundleNum: number): Promise<string> { 204 const path = this.getBundlePath(bundleNum); 205 const compressed = await Bun.file(path).arrayBuffer(); 206 const decompressed = Bun.zstdDecompressSync(compressed); 207 return new TextDecoder().decode(decompressed); 208 } 209 210 /** 211 * Parse operations from JSONL content. 212 * 213 * @param content - JSONL string with one operation per line 214 * @returns Array of parsed operations 215 * 216 * @example 217 * ```ts 218 * const jsonl = await bundle.readBundle(1); 219 * const operations = bundle.parseOperations(jsonl); 220 * console.log(`Parsed ${operations.length} operations`); 221 * ``` 222 */ 223 parseOperations(content: string): Operation[] { 224 return content 225 .split('\n') 226 .filter(line => line.trim()) 227 .map(line => JSON.parse(line)); 228 } 229 230 /** 231 * Stream operations from a bundle (memory efficient). 232 * 233 * This async generator yields operations one at a time, which is more 234 * memory efficient than loading all operations at once. 235 * 236 * @param bundleNum - The bundle number to stream from 237 * @yields Operations from the bundle 238 * 239 * @example 240 * ```ts 241 * for await (const op of bundle.streamOperations(1)) { 242 * console.log(op.did); 243 * if (someCondition) break; // Can stop early 244 * } 245 * ``` 246 */ 247 async *streamOperations(bundleNum: number): AsyncGenerator<{ op: Operation; line: string }> { 248 const content = await this.readBundle(bundleNum); 249 const lines = content.split('\n'); 250 251 for (const line of lines) { 252 if (line.trim()) { 253 yield { op: JSON.parse(line), line }; 254 } 255 } 256 } 257 258 /** 259 * Process multiple bundles with a callback or module. 260 * 261 * Two modes: 262 * 1. **Direct callback** (single-threaded): Pass callback function 263 * 2. **Module path** (multi-threaded): Pass module path in options 264 * 265 * @param start - First bundle number 266 * @param end - Last bundle number 267 * @param callbackOrOptions - Callback function OR options object with module path 268 * @param options - Processing options (only if callback is provided) 269 * @returns Promise resolving to processing statistics 270 * 271 * @example Single-threaded with callback 272 * ```ts 273 * await bundle.processBundles(1, 10, (op, pos, num, line) => { 274 * console.log(op.did); 275 * }); 276 * ``` 277 * 278 * @example Multi-threaded with module 279 * ```ts 280 * await bundle.processBundles(1, 100, { 281 * module: './detect.ts', 282 * threads: 4, 283 * onProgress: (stats) => console.log(stats.totalOps) 284 * }); 285 * ``` 286 */ 287 async processBundles( 288 start: number, 289 end: number, 290 callbackOrOptions: ProcessCallback | ProcessOptions, 291 options?: ProcessOptions 292 ): Promise<ProcessStats & { matches?: any[] }> { 293 let callback: ProcessCallback | undefined; 294 let processOptions: ProcessOptions; 295 296 if (typeof callbackOrOptions === 'function') { 297 callback = callbackOrOptions; 298 processOptions = options || {}; 299 } else { 300 processOptions = callbackOrOptions; 301 } 302 303 const { threads = 1, module, silent = false, flush = false, onProgress, onMatch } = processOptions; 304 305 // Validation: multi-threading requires module 306 if (threads > 1 && !module) { 307 throw new Error('Multi-threading requires module path. Use: processBundles(start, end, { module: "./detect.ts", threads: 4 })'); 308 } 309 310 // Determine mode based on what function is exported 311 let mode: 'detect' | 'process' = 'detect'; 312 let mod: any; 313 if (module) { 314 try { 315 mod = await import(module); 316 // If module has 'process' function, use process mode 317 if (mod.process) { 318 mode = 'process'; 319 } else if (mod.detect) { 320 mode = 'detect'; 321 } 322 } catch (e) { 323 // Default to detect 324 } 325 } 326 327 // Use workers for multi-threading with module 328 if (threads > 1 && module) { 329 return await this.processBundlesWorkers( 330 start, 331 end, 332 module, 333 threads, 334 silent, 335 flush, 336 mode, // Pass mode 337 onProgress, 338 onMatch 339 ); 340 } 341 342 // Load module if provided but single-threaded 343 if (mod && !callback) { 344 const userFn = mode === 'detect' ? (mod.detect || mod.default) : (mod.process || mod.default); 345 346 callback = (op, position, bundleNum, line) => { 347 if (mode === 'detect') { 348 userFn({ op }); 349 } else { 350 userFn({ op, position, bundle: bundleNum, line }); 351 } 352 }; 353 } 354 355 if (!callback) { 356 throw new Error('Either callback function or module path must be provided'); 357 } 358 359 // Single-threaded fast path 360 return await this.processBundlesFast(start, end, callback, onProgress); 361 } 362 363 /** 364 * Multi-threaded processing using workers (optimized) 365 */ 366 private async processBundlesWorkers( 367 start: number, 368 end: number, 369 modulePath: string, 370 threads: number, 371 silent: boolean, 372 flush: boolean, 373 mode: 'detect' | 'process', // Add mode parameter 374 onProgress?: (stats: ProcessStats) => void, 375 onMatch?: (match: any) => void 376 ): Promise<ProcessStats & { matches?: any[] }> { 377 const totalBundles = end - start + 1; 378 const bundlesPerThread = Math.ceil(totalBundles / threads); 379 380 const workerPath = new URL('./worker.ts', import.meta.url).pathname; 381 const workers: Worker[] = []; 382 const workerStats: Array<{ totalOps: number; totalBytes: number }> = []; 383 384 let aggregatedOps = 0; 385 let aggregatedBytes = 0; 386 const allMatches: any[] = []; 387 388 const workerPromises = []; 389 390 for (let i = 0; i < threads; i++) { 391 const threadStart = start + i * bundlesPerThread; 392 const threadEnd = Math.min(threadStart + bundlesPerThread - 1, end); 393 394 if (threadStart > end) break; 395 396 const worker = new Worker(workerPath); 397 workers.push(worker); 398 399 workerStats[i] = { totalOps: 0, totalBytes: 0 }; 400 401 const promise = new Promise<any>((resolve) => { 402 worker.onmessage = (event) => { 403 const msg = event.data; 404 405 if (msg.type === 'progress') { 406 const oldStats = workerStats[i]; 407 aggregatedOps += msg.totalOps - oldStats.totalOps; 408 aggregatedBytes += msg.totalBytes - oldStats.totalBytes; 409 workerStats[i] = { totalOps: msg.totalOps, totalBytes: msg.totalBytes }; 410 411 if (onProgress) { 412 onProgress({ 413 totalOps: aggregatedOps, 414 matchCount: 0, 415 totalBytes: aggregatedBytes, 416 matchedBytes: 0, 417 }); 418 } 419 } else if (msg.type === 'match') { 420 // Handle flushed match - call callback immediately 421 if (onMatch) { 422 onMatch(msg); 423 } 424 } else if (msg.type === 'result') { 425 resolve(msg); 426 } 427 }; 428 }); 429 430 workerPromises.push(promise); 431 432 worker.postMessage({ 433 dir: this.dir, 434 start: threadStart, 435 end: threadEnd, 436 modulePath, 437 silent, 438 flush, 439 mode, // Pass mode to worker 440 }); 441 } 442 443 // Wait for all workers 444 const results = await Promise.all(workerPromises); 445 446 // Cleanup 447 workers.forEach(w => w.terminate()); 448 449 if (modulePath) { 450 const mod = await import(modulePath); 451 if (mod.finalize) { 452 await mod.finalize(results, { aggregate: this.aggregate }); 453 } 454 } 455 456 // Aggregate results 457 let totalOps = 0; 458 let totalBytes = 0; 459 460 for (const result of results) { 461 totalOps += result.totalOps; 462 totalBytes += result.totalBytes; 463 if (!flush) { 464 allMatches.push(...result.matches); 465 } 466 } 467 468 // Sort matches if not flushed 469 if (!flush && mode === 'detect') { 470 allMatches.sort((a, b) => { 471 if (a.bundle !== b.bundle) return a.bundle - b.bundle; 472 return a.position - b.position; 473 }); 474 } 475 476 return { 477 totalOps, 478 matchCount: 0, 479 totalBytes, 480 matchedBytes: 0, 481 matches: flush || mode === 'process' ? undefined : allMatches, 482 }; 483 } 484 485 private aggregate(objects: Array<{ [key: string]: number }>): { [key: string]: number } { 486 const aggregatedDict: { [key: string]: number } = {}; 487 488 for (const currentObj of objects) { 489 for (const key in currentObj) { 490 if (Object.prototype.hasOwnProperty.call(currentObj, key)) { 491 aggregatedDict[key] = (aggregatedDict[key] || 0) + currentObj[key]; 492 } 493 } 494 } 495 496 return aggregatedDict; 497 } 498 499 500 /** 501 * Fast single-threaded processing (optimized) 502 */ 503 private async processBundlesFast( 504 start: number, 505 end: number, 506 callback: ProcessCallback, 507 onProgress?: (stats: ProcessStats) => void 508 ): Promise<ProcessStats> { 509 const stats: ProcessStats = { 510 totalOps: 0, 511 matchCount: 0, 512 totalBytes: 0, 513 matchedBytes: 0, 514 }; 515 516 for (let bundleNum = start; bundleNum <= end; bundleNum++) { 517 const content = await this.readBundle(bundleNum); 518 const lines = content.split('\n'); 519 520 for (let position = 0; position < lines.length; position++) { 521 const line = lines[position]; 522 if (!line.trim()) continue; 523 524 stats.totalOps++; 525 stats.totalBytes += line.length; 526 527 const op = JSON.parse(line); 528 await callback(op, position, bundleNum, line); 529 530 if (onProgress && stats.totalOps % 10000 === 0) { 531 onProgress({ ...stats }); 532 } 533 } 534 } 535 536 return stats; 537 } 538 539 /** 540 * Clone bundles from a remote repository. 541 * 542 * Downloads bundles via HTTP, verifies hashes, and saves progress periodically. 543 * Supports resuming interrupted downloads - bundles that already exist and 544 * pass verification will be skipped. 545 * 546 * Progress is automatically saved every 5 seconds and on completion. 547 * 548 * @param remoteUrl - Base URL of the remote bundle repository 549 * @param options - Clone options (bundles, threads, verification, callbacks) 550 * @returns Promise resolving to clone statistics 551 * @throws Error if remote is unreachable or bundle range is invalid 552 * 553 * @example Clone all bundles 554 * ```ts 555 * await bundle.clone('https://plcbundle.atscan.net', { 556 * threads: 8, 557 * verify: true 558 * }); 559 * ``` 560 * 561 * @example Clone specific range with progress 562 * ```ts 563 * await bundle.clone('https://plcbundle.atscan.net', { 564 * bundles: '1-100', 565 * threads: 4, 566 * onProgress: (stats) => { 567 * const pct = (stats.downloadedBytes / stats.totalBytes * 100).toFixed(1); 568 * console.log(`${pct}% complete`); 569 * } 570 * }); 571 * ``` 572 * 573 * @example Resume interrupted download 574 * ```ts 575 * // First run - interrupted 576 * await bundle.clone('https://plcbundle.atscan.net', { bundles: '1-1000' }); 577 * 578 * // Second run - automatically resumes 579 * await bundle.clone('https://plcbundle.atscan.net', { bundles: '1-1000' }); 580 * ``` 581 */ 582 async clone( 583 remoteUrl: string, 584 options: CloneOptions = {} 585 ): Promise<CloneStats> { 586 const { threads = 4, bundles, verify = true, shouldStop, onProgress } = options; 587 588 // Ensure remote URL doesn't end with / 589 const baseUrl = remoteUrl.endsWith('/') ? remoteUrl.slice(0, -1) : remoteUrl; 590 591 // Ensure local directory exists 592 await Bun.write(Bun.file(this.dir + '.keep'), ''); 593 594 // Try to load existing local index (for resume) 595 let localIndex: BundleIndex | null = null; 596 const alreadyDownloaded = new Set<number>(); 597 598 try { 599 localIndex = await this.loadIndex(); 600 // Track what's already downloaded 601 for (const bundle of localIndex.bundles) { 602 alreadyDownloaded.add(bundle.bundle_number); 603 } 604 } catch (error) { 605 // No local index yet, that's fine 606 } 607 608 // Download remote index 609 const indexUrl = `${baseUrl}/index.json`; 610 const indexResponse = await fetch(indexUrl); 611 612 if (!indexResponse.ok) { 613 throw new Error(`Failed to fetch index: ${indexResponse.statusText}`); 614 } 615 616 const remoteIndex: BundleIndex = await indexResponse.json(); 617 618 // Parse bundle selection 619 let start: number, end: number; 620 621 if (bundles) { 622 if (bundles.includes('-')) { 623 const [startStr, endStr] = bundles.split('-'); 624 start = parseInt(startStr.trim()); 625 end = parseInt(endStr.trim()); 626 } else { 627 start = parseInt(bundles); 628 end = start; 629 } 630 } else { 631 start = 1; 632 end = remoteIndex.last_bundle; 633 } 634 635 // Validate range 636 if (start < 1 || end > remoteIndex.last_bundle || start > end) { 637 throw new Error(`Invalid bundle range ${start}-${end} (available: 1-${remoteIndex.last_bundle})`); 638 } 639 640 // Get all bundles in range 641 const allBundlesInRange = remoteIndex.bundles.filter( 642 b => b.bundle_number >= start && b.bundle_number <= end 643 ); 644 645 // Filter out already downloaded bundles (that still exist and are valid) 646 const bundlesToCheck: BundleMetadata[] = []; 647 const bundlesToDownload: BundleMetadata[] = []; 648 649 for (const bundle of allBundlesInRange) { 650 if (alreadyDownloaded.has(bundle.bundle_number)) { 651 bundlesToCheck.push(bundle); 652 } else { 653 bundlesToDownload.push(bundle); 654 } 655 } 656 657 // Initialize stats 658 const stats: CloneStats = { 659 totalBundles: allBundlesInRange.length, 660 downloadedBundles: 0, 661 skippedBundles: 0, 662 failedBundles: 0, 663 totalBytes: allBundlesInRange.reduce((sum, b) => sum + b.compressed_size, 0), 664 downloadedBytes: 0, 665 }; 666 667 // Track downloaded bundle numbers (start with already downloaded) 668 const downloadedBundleNumbers = new Set<number>(alreadyDownloaded); 669 670 // Quick verification of already downloaded bundles 671 if (bundlesToCheck.length > 0) { 672 for (const metadata of bundlesToCheck) { 673 const bundlePath = this.getBundlePath(metadata.bundle_number); 674 const bundleFile = Bun.file(bundlePath); 675 676 if (await bundleFile.exists()) { 677 const existingSize = bundleFile.size; 678 679 if (existingSize === metadata.compressed_size) { 680 // File exists with correct size 681 stats.skippedBundles++; 682 stats.downloadedBytes += metadata.compressed_size; 683 continue; 684 } 685 } 686 687 // File missing or corrupt, need to re-download 688 downloadedBundleNumbers.delete(metadata.bundle_number); 689 bundlesToDownload.push(metadata); 690 } 691 692 if (bundlesToCheck.length > 0) { 693 const validCount = stats.skippedBundles; 694 const invalidCount = bundlesToCheck.length - validCount; 695 696 if (validCount > 0) { 697 console.error(`Resuming: found ${validCount} valid bundles, re-downloading ${invalidCount} corrupted bundles\n`); 698 } 699 } 700 } 701 702 // If nothing to download, we're done 703 if (bundlesToDownload.length === 0) { 704 // Save index anyway to make sure it's up to date 705 await this.saveIndex(remoteIndex); 706 return stats; 707 } 708 709 console.error(`Downloading ${bundlesToDownload.length} bundles...\n`); 710 711 // Function to save partial index 712 const savePartialIndex = async () => { 713 const partialBundles = remoteIndex.bundles.filter(b => 714 downloadedBundleNumbers.has(b.bundle_number) 715 ); 716 717 if (partialBundles.length === 0) return; 718 719 const partialIndex: BundleIndex = { 720 ...remoteIndex, 721 last_bundle: Math.max(...Array.from(downloadedBundleNumbers)), 722 bundles: partialBundles, 723 }; 724 725 await this.saveIndex(partialIndex); 726 }; 727 728 // Download bundle function 729 const downloadBundle = async (metadata: BundleMetadata): Promise<void> => { 730 // Check if we should stop 731 if (shouldStop && shouldStop()) { 732 return; 733 } 734 735 const bundlePath = this.getBundlePath(metadata.bundle_number); 736 737 // Download bundle 738 try { 739 const bundleUrl = `${baseUrl}/data/${metadata.bundle_number}`; 740 const response = await fetch(bundleUrl); 741 742 if (!response.ok) { 743 throw new Error(`HTTP ${response.status}: ${response.statusText}`); 744 } 745 746 const data = await response.arrayBuffer(); 747 748 // Verify hash 749 if (verify) { 750 const downloadedHash = sha256(new Uint8Array(data)); 751 752 if (downloadedHash !== metadata.compressed_hash) { 753 throw new Error(`Hash mismatch: ${downloadedHash} != ${metadata.compressed_hash}`); 754 } 755 } 756 757 // Write to file 758 await Bun.write(bundlePath, data); 759 760 stats.downloadedBundles++; 761 stats.downloadedBytes += metadata.compressed_size; 762 downloadedBundleNumbers.add(metadata.bundle_number); 763 764 if (onProgress) { 765 onProgress({ ...stats }); 766 } 767 } catch (error) { 768 stats.failedBundles++; 769 throw new Error(`Failed to download bundle ${metadata.bundle_number}: ${error}`); 770 } 771 }; 772 773 // Setup periodic index saving (every 5 seconds) 774 const saveInterval = setInterval(async () => { 775 try { 776 await savePartialIndex(); 777 } catch (error) { 778 console.error('Error saving partial index:', error); 779 } 780 }, 5000); 781 782 try { 783 // Download in parallel batches 784 for (let i = 0; i < bundlesToDownload.length; i += threads) { 785 // Check if we should stop before starting new batch 786 if (shouldStop && shouldStop()) { 787 break; 788 } 789 790 const batch = bundlesToDownload.slice(i, i + threads); 791 await Promise.allSettled(batch.map(downloadBundle)); 792 } 793 794 // Save final index (complete if not stopped, partial if stopped) 795 if (shouldStop && shouldStop()) { 796 await savePartialIndex(); 797 } else { 798 await this.saveIndex(remoteIndex); 799 } 800 } finally { 801 // Cleanup interval and ensure final save 802 clearInterval(saveInterval); 803 await savePartialIndex(); 804 } 805 806 return stats; 807 } 808 809 /** 810 * Verify the integrity of a bundle. 811 * 812 * Checks both the compressed file hash and the content hash against 813 * the values stored in the bundle index. 814 * 815 * Uses Bun's native SHA-256 hasher for optimal performance. 816 * 817 * @param bundleNum - The bundle number to verify 818 * @returns Promise resolving to verification result 819 * 820 * @example 821 * ```ts 822 * const result = await bundle.verifyBundle(42); 823 * if (result.valid) { 824 * console.log('Bundle is valid'); 825 * } else { 826 * console.error('Verification failed:'); 827 * result.errors.forEach(err => console.error(` - ${err}`)); 828 * } 829 * ``` 830 */ 831 async verifyBundle(bundleNum: number): Promise<{ valid: boolean; errors: string[] }> { 832 const metadata = await this.getMetadata(bundleNum); 833 if (!metadata) { 834 return { valid: false, errors: [`Bundle ${bundleNum} not found in index`] }; 835 } 836 837 const errors: string[] = []; 838 const path = this.getBundlePath(bundleNum); 839 840 // Verify compressed hash 841 const compressed = await Bun.file(path).arrayBuffer(); 842 const compressedHash = sha256(new Uint8Array(compressed)); 843 844 if (compressedHash !== metadata.compressed_hash) { 845 errors.push(`Compressed hash mismatch: ${compressedHash} != ${metadata.compressed_hash}`); 846 } 847 848 // Verify content hash 849 const decompressed = Bun.zstdDecompressSync(compressed); 850 const contentHash = sha256(decompressed); 851 852 if (contentHash !== metadata.content_hash) { 853 errors.push(`Content hash mismatch: ${contentHash} != ${metadata.content_hash}`); 854 } 855 856 return { valid: errors.length === 0, errors }; 857 } 858 859 /** 860 * Calculate a chain hash linking bundles together. 861 * 862 * The chain hash ensures bundles form an unbroken chain and haven't 863 * been tampered with. Genesis bundles use a special hash format. 864 * 865 * @param parentHash - Hash of the parent bundle (empty for genesis) 866 * @param contentHash - Hash of this bundle's content 867 * @param isGenesis - Whether this is the first bundle in the chain 868 * @returns The calculated chain hash as a hex string 869 * 870 * @example 871 * ```ts 872 * // Genesis bundle 873 * const genesisHash = bundle.calculateChainHash('', contentHash, true); 874 * 875 * // Subsequent bundle 876 * const chainHash = bundle.calculateChainHash(parentHash, contentHash, false); 877 * ``` 878 */ 879 calculateChainHash(parentHash: string, contentHash: string, isGenesis: boolean): string { 880 const input = isGenesis 881 ? `plcbundle:genesis:${contentHash}` 882 : `${parentHash}:${contentHash}`; 883 884 return sha256(input); 885 } 886 887 /** 888 * Get repository statistics. 889 * 890 * Provides a quick overview of the bundle repository without loading 891 * all bundle metadata. 892 * 893 * @returns Promise resolving to repository statistics 894 * 895 * @example 896 * ```ts 897 * const stats = await bundle.getStats(); 898 * console.log(`Version: ${stats.version}`); 899 * console.log(`Bundles: ${stats.totalBundles}`); 900 * console.log(`Size: ${(stats.totalSize / 1e9).toFixed(2)} GB`); 901 * console.log(`Updated: ${stats.updatedAt}`); 902 * ``` 903 */ 904 async getStats(): Promise<{ 905 version: string; 906 lastBundle: number; 907 totalBundles: number; 908 totalSize: number; 909 updatedAt: string; 910 }> { 911 const index = await this.loadIndex(); 912 return { 913 version: index.version, 914 lastBundle: index.last_bundle, 915 totalBundles: index.bundles.length, 916 totalSize: index.total_size_bytes, 917 updatedAt: index.updated_at, 918 }; 919 } 920 921 /** 922 * Verify the integrity of the entire bundle chain. 923 * 924 * This method validates: 925 * - Each bundle's compressed and content hashes 926 * - The chain hash linking each bundle to its parent 927 * - The continuity of the chain (no missing bundles) 928 * - The genesis bundle has correct initial hash 929 * 930 * @param options - Verification options 931 * @param options.start - First bundle to verify (default: 1) 932 * @param options.end - Last bundle to verify (default: last available bundle) 933 * @param options.onProgress - Callback for progress updates 934 * @returns Promise resolving to chain verification result 935 * 936 * @example Verify entire chain 937 * ```ts 938 * const result = await bundle.verifyChain(); 939 * 940 * if (result.valid) { 941 * console.log(`✓ All ${result.totalBundles} bundles verified`); 942 * } else { 943 * console.error(`✗ Chain invalid: ${result.invalidBundles} errors`); 944 * result.errors.forEach(({ bundleNum, errors }) => { 945 * console.error(`Bundle ${bundleNum}:`); 946 * errors.forEach(e => console.error(` - ${e}`)); 947 * }); 948 * } 949 * ``` 950 * 951 * @example Verify range with progress 952 * ```ts 953 * const result = await bundle.verifyChain({ 954 * start: 1, 955 * end: 100, 956 * onProgress: (current, total) => { 957 * console.log(`Verified ${current}/${total} bundles`); 958 * } 959 * }); 960 * ``` 961 */ 962 async verifyChain(options: { 963 start?: number; 964 end?: number; 965 onProgress?: (current: number, total: number) => void; 966 } = {}): Promise<ChainVerificationResult> { 967 const index = await this.loadIndex(); 968 969 const start = options.start || 1; 970 const end = options.end || index.last_bundle; 971 972 // Validate range 973 if (start < 1 || end > index.last_bundle || start > end) { 974 throw new Error(`Invalid bundle range ${start}-${end} (available: 1-${index.last_bundle})`); 975 } 976 977 const result: ChainVerificationResult = { 978 valid: true, 979 totalBundles: end - start + 1, 980 validBundles: 0, 981 invalidBundles: 0, 982 errors: [], 983 }; 984 985 let previousHash = ''; 986 987 for (let bundleNum = start; bundleNum <= end; bundleNum++) { 988 const metadata = index.bundles.find(b => b.bundle_number === bundleNum); 989 990 if (!metadata) { 991 result.valid = false; 992 result.invalidBundles++; 993 result.errors.push({ 994 bundleNum, 995 errors: ['Bundle missing from index'], 996 }); 997 continue; 998 } 999 1000 const bundleErrors: string[] = []; 1001 1002 // Verify bundle file hashes 1003 const verification = await this.verifyBundle(bundleNum); 1004 if (!verification.valid) { 1005 bundleErrors.push(...verification.errors); 1006 } 1007 1008 // Verify chain linkage 1009 if (bundleNum === 1) { 1010 // Genesis bundle should have empty parent 1011 if (metadata.parent !== '') { 1012 bundleErrors.push(`Genesis bundle should have empty parent, got: ${metadata.parent}`); 1013 } 1014 1015 // Verify genesis hash format 1016 const expectedHash = this.calculateChainHash('', metadata.content_hash, true); 1017 if (metadata.hash !== expectedHash) { 1018 bundleErrors.push(`Invalid genesis chain hash: ${metadata.hash} != ${expectedHash}`); 1019 } 1020 } else { 1021 // Verify parent reference 1022 if (metadata.parent !== previousHash) { 1023 bundleErrors.push(`Parent hash mismatch: expected ${previousHash}, got ${metadata.parent}`); 1024 } 1025 1026 // Verify chain hash 1027 const expectedHash = this.calculateChainHash(metadata.parent, metadata.content_hash, false); 1028 if (metadata.hash !== expectedHash) { 1029 bundleErrors.push(`Invalid chain hash: ${metadata.hash} != ${expectedHash}`); 1030 } 1031 } 1032 1033 // Record results 1034 if (bundleErrors.length > 0) { 1035 result.valid = false; 1036 result.invalidBundles++; 1037 result.errors.push({ 1038 bundleNum, 1039 errors: bundleErrors, 1040 }); 1041 } else { 1042 result.validBundles++; 1043 } 1044 1045 previousHash = metadata.hash; 1046 1047 // Report progress 1048 if (options.onProgress) { 1049 options.onProgress(bundleNum - start + 1, result.totalBundles); 1050 } 1051 } 1052 1053 return result; 1054 } 1055 1056}