A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at test-validate 471 lines 13 kB view raw
1package bundle 2 3import ( 4 "bufio" 5 "bytes" 6 "crypto/sha256" 7 "encoding/hex" 8 "encoding/json" 9 "fmt" 10 "io" 11 "os" 12 "time" 13 14 "github.com/klauspost/compress/zstd" 15 "tangled.org/atscan.net/plcbundle/plc" 16) 17 18// Operations handles low-level bundle file operations 19type Operations struct { 20 encoder *zstd.Encoder 21 decoder *zstd.Decoder 22 logger Logger 23} 24 25// NewOperations creates a new Operations handler with default compression 26func NewOperations(logger Logger) (*Operations, error) { 27 // Always use default compression (level 3 - good balance) 28 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) 29 if err != nil { 30 return nil, fmt.Errorf("failed to create zstd encoder: %w", err) 31 } 32 33 decoder, err := zstd.NewReader(nil) 34 if err != nil { 35 return nil, fmt.Errorf("failed to create zstd decoder: %w", err) 36 } 37 38 return &Operations{ 39 encoder: encoder, 40 decoder: decoder, 41 logger: logger, 42 }, nil 43} 44 45// Close cleans up resources 46func (op *Operations) Close() { 47 if op.encoder != nil { 48 op.encoder.Close() 49 } 50 if op.decoder != nil { 51 op.decoder.Close() 52 } 53} 54 55// ======================================== 56// CORE SERIALIZATION (JSONL) 57// ======================================== 58 59// SerializeJSONL serializes operations to newline-delimited JSON 60// This is the ONE method everyone should use for serialization 61func (op *Operations) SerializeJSONL(operations []plc.PLCOperation) []byte { 62 var buf bytes.Buffer 63 64 for _, operation := range operations { 65 // Use RawJSON if available (preserves exact format) 66 if len(operation.RawJSON) > 0 { 67 buf.Write(operation.RawJSON) 68 } else { 69 // Fallback to marshaling 70 data, _ := json.Marshal(operation) 71 buf.Write(data) 72 } 73 buf.WriteByte('\n') 74 } 75 76 return buf.Bytes() 77} 78 79// ParseJSONL parses newline-delimited JSON into operations 80// This is the ONE method everyone should use for deserialization 81func (op *Operations) ParseJSONL(data []byte) ([]plc.PLCOperation, error) { 82 var operations []plc.PLCOperation 83 scanner := bufio.NewScanner(bytes.NewReader(data)) 84 85 // Set a large buffer for long lines 86 buf := make([]byte, 0, 64*1024) 87 scanner.Buffer(buf, 1024*1024) 88 89 lineNum := 0 90 for scanner.Scan() { 91 lineNum++ 92 line := scanner.Bytes() 93 94 if len(line) == 0 { 95 continue 96 } 97 98 var operation plc.PLCOperation 99 if err := json.Unmarshal(line, &operation); err != nil { 100 return nil, fmt.Errorf("failed to parse line %d: %w", lineNum, err) 101 } 102 103 // Store raw JSON 104 operation.RawJSON = make([]byte, len(line)) 105 copy(operation.RawJSON, line) 106 107 operations = append(operations, operation) 108 } 109 110 if err := scanner.Err(); err != nil { 111 return nil, fmt.Errorf("scanner error: %w", err) 112 } 113 114 return operations, nil 115} 116 117// ======================================== 118// FILE OPERATIONS (uses JSONL + compression) 119// ======================================== 120 121// LoadBundle loads a compressed bundle from disk 122func (op *Operations) LoadBundle(path string) ([]plc.PLCOperation, error) { 123 // Read compressed file 124 compressed, err := os.ReadFile(path) 125 if err != nil { 126 return nil, fmt.Errorf("failed to read file: %w", err) 127 } 128 129 // Decompress 130 decompressed, err := op.decoder.DecodeAll(compressed, nil) 131 if err != nil { 132 return nil, fmt.Errorf("failed to decompress: %w", err) 133 } 134 135 // Parse JSONL 136 return op.ParseJSONL(decompressed) 137} 138 139// SaveBundle saves operations to disk (compressed) 140// Returns: contentHash, compressedHash, contentSize, compressedSize, error 141func (op *Operations) SaveBundle(path string, operations []plc.PLCOperation) (string, string, int64, int64, error) { 142 // Serialize to JSONL 143 jsonlData := op.SerializeJSONL(operations) 144 contentSize := int64(len(jsonlData)) 145 contentHash := op.Hash(jsonlData) 146 147 // Compress 148 compressed := op.encoder.EncodeAll(jsonlData, nil) 149 compressedSize := int64(len(compressed)) 150 compressedHash := op.Hash(compressed) 151 152 // Write to file 153 if err := os.WriteFile(path, compressed, 0644); err != nil { 154 return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err) 155 } 156 157 return contentHash, compressedHash, contentSize, compressedSize, nil 158} 159 160// ======================================== 161// STREAMING 162// ======================================== 163 164// StreamRaw returns a reader for the raw compressed bundle file 165func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) { 166 file, err := os.Open(path) 167 if err != nil { 168 return nil, fmt.Errorf("failed to open bundle: %w", err) 169 } 170 return file, nil 171} 172 173// StreamDecompressed returns a reader for decompressed bundle data 174func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) { 175 file, err := os.Open(path) 176 if err != nil { 177 return nil, fmt.Errorf("failed to open bundle: %w", err) 178 } 179 180 // Create a new decoder for this stream 181 decoder, err := zstd.NewReader(file) 182 if err != nil { 183 file.Close() 184 return nil, fmt.Errorf("failed to create decompressor: %w", err) 185 } 186 187 // Return a wrapper that closes both the decoder and file 188 return &decompressedReader{ 189 decoder: decoder, 190 file: file, 191 }, nil 192} 193 194// decompressedReader wraps a zstd decoder and underlying file 195type decompressedReader struct { 196 decoder *zstd.Decoder 197 file *os.File 198} 199 200func (dr *decompressedReader) Read(p []byte) (int, error) { 201 return dr.decoder.Read(p) 202} 203 204func (dr *decompressedReader) Close() error { 205 dr.decoder.Close() 206 return dr.file.Close() 207} 208 209// ======================================== 210// HASHING 211// ======================================== 212 213// Hash computes SHA256 hash of data 214func (op *Operations) Hash(data []byte) string { 215 h := sha256.Sum256(data) 216 return hex.EncodeToString(h[:]) 217} 218 219// CalculateChainHash calculates the cumulative chain hash 220func (op *Operations) CalculateChainHash(parent string, contentHash string) string { 221 var data string 222 if parent == "" { 223 // Genesis bundle (first bundle) 224 data = "plcbundle:genesis:" + contentHash 225 } else { 226 // Subsequent bundles - chain parent hash with current content 227 data = parent + ":" + contentHash 228 } 229 return op.Hash([]byte(data)) 230} 231 232// CalculateFileHashes calculates both content and compressed hashes efficiently 233func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) { 234 // Read compressed file 235 compressedData, err := os.ReadFile(path) 236 if err != nil { 237 return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err) 238 } 239 240 // Calculate compressed hash 241 compressedHash = op.Hash(compressedData) 242 compressedSize = int64(len(compressedData)) 243 244 // Decompress 245 decompressed, err := op.decoder.DecodeAll(compressedData, nil) 246 if err != nil { 247 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err) 248 } 249 250 // Calculate content hash 251 contentHash = op.Hash(decompressed) 252 contentSize = int64(len(decompressed)) 253 254 return compressedHash, compressedSize, contentHash, contentSize, nil 255} 256 257// VerifyHash verifies the hash of a bundle file 258func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) { 259 data, err := os.ReadFile(path) 260 if err != nil { 261 return false, "", fmt.Errorf("failed to read file: %w", err) 262 } 263 264 actualHash := op.Hash(data) 265 return actualHash == expectedHash, actualHash, nil 266} 267 268// ======================================== 269// UTILITY FUNCTIONS 270// ======================================== 271 272// FileExists checks if a file exists 273func (op *Operations) FileExists(path string) bool { 274 _, err := os.Stat(path) 275 return err == nil 276} 277 278// GetFileSize returns the size of a file 279func (op *Operations) GetFileSize(path string) (int64, error) { 280 info, err := os.Stat(path) 281 if err != nil { 282 return 0, err 283 } 284 return info.Size(), nil 285} 286 287// ExtractUniqueDIDs extracts unique DIDs from operations 288func (op *Operations) ExtractUniqueDIDs(operations []plc.PLCOperation) []string { 289 didSet := make(map[string]bool) 290 for _, operation := range operations { 291 didSet[operation.DID] = true 292 } 293 294 dids := make([]string, 0, len(didSet)) 295 for did := range didSet { 296 dids = append(dids, did) 297 } 298 299 return dids 300} 301 302// GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation 303func (op *Operations) GetBoundaryCIDs(operations []plc.PLCOperation) (time.Time, map[string]bool) { 304 if len(operations) == 0 { 305 return time.Time{}, nil 306 } 307 308 lastOp := operations[len(operations)-1] 309 boundaryTime := lastOp.CreatedAt 310 cidSet := make(map[string]bool) 311 312 // Walk backwards from the end 313 for i := len(operations) - 1; i >= 0; i-- { 314 op := operations[i] 315 if op.CreatedAt.Equal(boundaryTime) { 316 cidSet[op.CID] = true 317 } else { 318 break 319 } 320 } 321 322 return boundaryTime, cidSet 323} 324 325// StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs 326func (op *Operations) StripBoundaryDuplicates(operations []plc.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plc.PLCOperation { 327 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 { 328 return operations 329 } 330 331 boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp) 332 if err != nil { 333 return operations 334 } 335 336 startIdx := 0 337 for startIdx < len(operations) { 338 op := operations[startIdx] 339 340 if op.CreatedAt.After(boundaryTime) { 341 break 342 } 343 344 if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] { 345 startIdx++ 346 continue 347 } 348 349 break 350 } 351 352 return operations[startIdx:] 353} 354 355// CreateBundle creates a complete bundle structure from operations 356func (op *Operations) CreateBundle(bundleNumber int, operations []plc.PLCOperation, cursor string, parent string) *Bundle { 357 if len(operations) != BUNDLE_SIZE { 358 op.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), BUNDLE_SIZE) 359 } 360 361 dids := op.ExtractUniqueDIDs(operations) 362 _, boundaryCIDs := op.GetBoundaryCIDs(operations) 363 364 // Convert boundary CIDs map to slice 365 cidSlice := make([]string, 0, len(boundaryCIDs)) 366 for cid := range boundaryCIDs { 367 cidSlice = append(cidSlice, cid) 368 } 369 370 bundle := &Bundle{ 371 BundleNumber: bundleNumber, 372 StartTime: operations[0].CreatedAt, 373 EndTime: operations[len(operations)-1].CreatedAt, 374 Operations: operations, 375 DIDCount: len(dids), 376 Cursor: cursor, 377 Parent: parent, 378 BoundaryCIDs: cidSlice, 379 Compressed: true, 380 CreatedAt: time.Now().UTC(), 381 } 382 383 return bundle 384} 385 386// ======================================== 387// METADATA CALCULATION 388// ======================================== 389 390// CalculateBundleMetadata calculates complete metadata for a bundle 391// This is the ONE method everyone should use for metadata calculation 392func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, parent string, cursor string) (*BundleMetadata, error) { 393 if len(operations) == 0 { 394 return nil, fmt.Errorf("bundle is empty") 395 } 396 397 // Get file info 398 info, err := os.Stat(path) 399 if err != nil { 400 return nil, fmt.Errorf("failed to stat file: %w", err) 401 } 402 403 // Extract unique DIDs 404 dids := op.ExtractUniqueDIDs(operations) 405 406 // Serialize to JSONL and calculate content hash 407 jsonlData := op.SerializeJSONL(operations) 408 contentSize := int64(len(jsonlData)) 409 contentHash := op.Hash(jsonlData) 410 411 // Read compressed file and calculate compressed hash 412 compressedData, err := os.ReadFile(path) 413 if err != nil { 414 return nil, fmt.Errorf("failed to read compressed file: %w", err) 415 } 416 compressedHash := op.Hash(compressedData) 417 compressedSize := info.Size() 418 419 // Calculate chain hash 420 chainHash := op.CalculateChainHash(parent, contentHash) 421 422 return &BundleMetadata{ 423 BundleNumber: bundleNumber, 424 StartTime: operations[0].CreatedAt, 425 EndTime: operations[len(operations)-1].CreatedAt, 426 OperationCount: len(operations), 427 DIDCount: len(dids), 428 Hash: chainHash, // Chain hash (primary) 429 ContentHash: contentHash, // Content hash 430 Parent: parent, // Parent chain hash 431 CompressedHash: compressedHash, 432 CompressedSize: compressedSize, 433 UncompressedSize: contentSize, 434 Cursor: cursor, 435 CreatedAt: time.Now().UTC(), 436 }, nil 437} 438 439// CalculateBundleMetadataFast calculates metadata quickly without chain hash 440// Used during parallel scanning - chain hash calculated later sequentially 441func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plc.PLCOperation, cursor string) (*BundleMetadata, error) { 442 if len(operations) == 0 { 443 return nil, fmt.Errorf("bundle is empty") 444 } 445 446 // Calculate hashes efficiently (read file once) 447 compressedHash, compressedSize, contentHash, contentSize, err := op.CalculateFileHashes(path) 448 if err != nil { 449 return nil, err 450 } 451 452 // Extract unique DIDs 453 dids := op.ExtractUniqueDIDs(operations) 454 455 // Note: Hash, Parent, and Cursor are set to empty - will be calculated later sequentially 456 return &BundleMetadata{ 457 BundleNumber: bundleNumber, 458 StartTime: operations[0].CreatedAt, 459 EndTime: operations[len(operations)-1].CreatedAt, 460 OperationCount: len(operations), 461 DIDCount: len(dids), 462 Hash: "", // Chain hash - calculated later 463 ContentHash: contentHash, // Content hash 464 Parent: "", // Parent - set later 465 CompressedHash: compressedHash, 466 CompressedSize: compressedSize, 467 UncompressedSize: contentSize, 468 Cursor: cursor, 469 CreatedAt: time.Now().UTC(), 470 }, nil 471}