A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 481 lines 14 kB view raw
1package bundle 2 3import ( 4 "bufio" 5 "bytes" 6 "crypto/sha256" 7 "encoding/hex" 8 "fmt" 9 "io" 10 "os" 11 "time" 12 13 gozstd "github.com/DataDog/zstd" 14 "github.com/goccy/go-json" 15 "tangled.org/atscan.net/plcbundle/plc" 16) 17 18// Operations handles low-level bundle file operations 19type Operations struct { 20 logger Logger 21} 22 23func NewOperations(logger Logger) (*Operations, error) { 24 return &Operations{logger: logger}, nil 25} 26 27func (op *Operations) Close() { 28 // Nothing to close 29} 30 31// ======================================== 32// CORE SERIALIZATION (JSONL) 33// ======================================== 34 35// SerializeJSONL serializes operations to newline-delimited JSON 36// This is the ONE method everyone should use for serialization 37func (op *Operations) SerializeJSONL(operations []plc.PLCOperation) []byte { 38 var buf bytes.Buffer 39 40 for _, operation := range operations { 41 // Use RawJSON if available (preserves exact format) 42 if len(operation.RawJSON) > 0 { 43 buf.Write(operation.RawJSON) 44 } else { 45 // Fallback to marshaling 46 data, _ := json.Marshal(operation) 47 buf.Write(data) 48 } 49 buf.WriteByte('\n') 50 } 51 52 return buf.Bytes() 53} 54 55// ParseJSONL parses newline-delimited JSON into operations 56// This is the ONE method everyone should use for deserialization 57func (op *Operations) ParseJSONL(data []byte) ([]plc.PLCOperation, error) { 58 var operations []plc.PLCOperation 59 scanner := bufio.NewScanner(bytes.NewReader(data)) 60 buf := make([]byte, 0, 64*1024) 61 scanner.Buffer(buf, 1024*1024) 62 63 for scanner.Scan() { 64 line := scanner.Bytes() 65 if len(line) == 0 { 66 continue 67 } 68 69 var operation plc.PLCOperation 70 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 71 return nil, fmt.Errorf("failed to parse line: %w", err) 72 } 73 74 operation.RawJSON = make([]byte, len(line)) 75 copy(operation.RawJSON, line) 76 operations = append(operations, operation) 77 } 78 79 return operations, nil 80} 81 82// ======================================== 83// FILE OPERATIONS (uses JSONL + compression) 84// ======================================== 85 86// LoadBundle loads a compressed bundle 87func (op *Operations) LoadBundle(path string) ([]plc.PLCOperation, error) { 88 compressed, err := os.ReadFile(path) 89 if err != nil { 90 return nil, fmt.Errorf("failed to read file: %w", err) 91 } 92 93 decompressed, err := gozstd.Decompress(nil, compressed) 94 if err != nil { 95 return nil, fmt.Errorf("failed to decompress: %w", err) 96 } 97 98 return op.ParseJSONL(decompressed) 99} 100 101// SaveBundle saves operations to disk (compressed) 102// Returns: contentHash, compressedHash, contentSize, compressedSize, error 103func (op *Operations) SaveBundle(path string, operations []plc.PLCOperation) (string, string, int64, int64, error) { 104 jsonlData := op.SerializeJSONL(operations) 105 contentSize := int64(len(jsonlData)) 106 contentHash := op.Hash(jsonlData) 107 108 // DataDog zstd.Compress returns ([]byte, error) 109 compressed, err := gozstd.Compress(nil, jsonlData) 110 if err != nil { 111 return "", "", 0, 0, fmt.Errorf("failed to compress: %w", err) 112 } 113 114 compressedSize := int64(len(compressed)) 115 compressedHash := op.Hash(compressed) 116 117 if err := os.WriteFile(path, compressed, 0644); err != nil { 118 return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err) 119 } 120 121 return contentHash, compressedHash, contentSize, compressedSize, nil 122} 123 124// LoadOperationAtPosition loads a single operation from a bundle without loading the entire bundle 125// This is much more efficient for single-operation lookups 126func (op *Operations) LoadOperationAtPosition(path string, position int) (*plc.PLCOperation, error) { 127 if position < 0 { 128 return nil, fmt.Errorf("invalid position: %d", position) 129 } 130 131 // Open compressed file 132 file, err := os.Open(path) 133 if err != nil { 134 return nil, fmt.Errorf("failed to open file: %w", err) 135 } 136 defer file.Close() 137 138 // Create zstd decompression reader (streaming, no full decompress) 139 reader := gozstd.NewReader(file) 140 defer reader.Close() 141 142 // Use scanner to skip to target line 143 scanner := bufio.NewScanner(reader) 144 buf := make([]byte, 0, 64*1024) 145 scanner.Buffer(buf, 1024*1024) 146 147 lineNum := 0 148 for scanner.Scan() { 149 if lineNum == position { 150 // Found target line! 151 line := scanner.Bytes() 152 153 var operation plc.PLCOperation 154 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 155 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err) 156 } 157 158 // Store raw JSON 159 operation.RawJSON = make([]byte, len(line)) 160 copy(operation.RawJSON, line) 161 162 return &operation, nil 163 } 164 lineNum++ 165 } 166 167 if err := scanner.Err(); err != nil { 168 return nil, fmt.Errorf("scanner error: %w", err) 169 } 170 171 return nil, fmt.Errorf("position %d not found (bundle has %d operations)", position, lineNum) 172} 173 174// ======================================== 175// STREAMING 176// ======================================== 177 178// StreamRaw returns a reader for the raw compressed bundle file 179func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) { 180 file, err := os.Open(path) 181 if err != nil { 182 return nil, fmt.Errorf("failed to open bundle: %w", err) 183 } 184 return file, nil 185} 186 187// StreamDecompressed returns a reader for decompressed bundle data 188func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) { 189 file, err := os.Open(path) 190 if err != nil { 191 return nil, fmt.Errorf("failed to open bundle: %w", err) 192 } 193 194 // Create zstd reader using DataDog's package 195 reader := gozstd.NewReader(file) 196 197 // Return a wrapper that closes both the reader and file 198 return &decompressedReader{ 199 reader: reader, 200 file: file, 201 }, nil 202} 203 204// decompressedReader wraps a zstd decoder and underlying file 205type decompressedReader struct { 206 reader io.ReadCloser 207 file *os.File 208} 209 210func (dr *decompressedReader) Read(p []byte) (int, error) { 211 return dr.reader.Read(p) 212} 213 214func (dr *decompressedReader) Close() error { 215 dr.reader.Close() 216 return dr.file.Close() 217} 218 219// ======================================== 220// HASHING 221// ======================================== 222 223// Hash computes SHA256 hash of data 224func (op *Operations) Hash(data []byte) string { 225 h := sha256.Sum256(data) 226 return hex.EncodeToString(h[:]) 227} 228 229// CalculateChainHash calculates the cumulative chain hash 230func (op *Operations) CalculateChainHash(parent string, contentHash string) string { 231 var data string 232 if parent == "" { 233 // Genesis bundle (first bundle) 234 data = "plcbundle:genesis:" + contentHash 235 } else { 236 // Subsequent bundles - chain parent hash with current content 237 data = parent + ":" + contentHash 238 } 239 return op.Hash([]byte(data)) 240} 241 242// CalculateFileHashes calculates both content and compressed hashes efficiently 243func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) { 244 // Read compressed file 245 compressedData, err := os.ReadFile(path) 246 if err != nil { 247 return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err) 248 } 249 250 // Calculate compressed hash 251 compressedHash = op.Hash(compressedData) 252 compressedSize = int64(len(compressedData)) 253 254 // Decompress with DataDog zstd 255 decompressed, err := gozstd.Decompress(nil, compressedData) 256 if err != nil { 257 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err) 258 } 259 260 // Calculate content hash 261 contentHash = op.Hash(decompressed) 262 contentSize = int64(len(decompressed)) 263 264 return compressedHash, compressedSize, contentHash, contentSize, nil 265} 266 267// VerifyHash verifies the hash of a bundle file 268func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) { 269 data, err := os.ReadFile(path) 270 if err != nil { 271 return false, "", fmt.Errorf("failed to read file: %w", err) 272 } 273 274 actualHash := op.Hash(data) 275 return actualHash == expectedHash, actualHash, nil 276} 277 278// ======================================== 279// UTILITY FUNCTIONS 280// ======================================== 281 282// FileExists checks if a file exists 283func (op *Operations) FileExists(path string) bool { 284 _, err := os.Stat(path) 285 return err == nil 286} 287 288// GetFileSize returns the size of a file 289func (op *Operations) GetFileSize(path string) (int64, error) { 290 info, err := os.Stat(path) 291 if err != nil { 292 return 0, err 293 } 294 return info.Size(), nil 295} 296 297// ExtractUniqueDIDs extracts unique DIDs from operations 298func (op *Operations) ExtractUniqueDIDs(operations []plc.PLCOperation) []string { 299 didSet := make(map[string]bool) 300 for _, operation := range operations { 301 didSet[operation.DID] = true 302 } 303 304 dids := make([]string, 0, len(didSet)) 305 for did := range didSet { 306 dids = append(dids, did) 307 } 308 309 return dids 310} 311 312// GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation 313func (op *Operations) GetBoundaryCIDs(operations []plc.PLCOperation) (time.Time, map[string]bool) { 314 if len(operations) == 0 { 315 return time.Time{}, nil 316 } 317 318 lastOp := operations[len(operations)-1] 319 boundaryTime := lastOp.CreatedAt 320 cidSet := make(map[string]bool) 321 322 // Walk backwards from the end 323 for i := len(operations) - 1; i >= 0; i-- { 324 op := operations[i] 325 if op.CreatedAt.Equal(boundaryTime) { 326 cidSet[op.CID] = true 327 } else { 328 break 329 } 330 } 331 332 return boundaryTime, cidSet 333} 334 335// StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs 336func (op *Operations) StripBoundaryDuplicates(operations []plc.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plc.PLCOperation { 337 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 { 338 return operations 339 } 340 341 boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp) 342 if err != nil { 343 return operations 344 } 345 346 startIdx := 0 347 for startIdx < len(operations) { 348 op := operations[startIdx] 349 350 if op.CreatedAt.After(boundaryTime) { 351 break 352 } 353 354 if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] { 355 startIdx++ 356 continue 357 } 358 359 break 360 } 361 362 return operations[startIdx:] 363} 364 365// CreateBundle creates a complete bundle structure from operations 366func (op *Operations) CreateBundle(bundleNumber int, operations []plc.PLCOperation, cursor string, parent string) *Bundle { 367 if len(operations) != BUNDLE_SIZE { 368 op.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), BUNDLE_SIZE) 369 } 370 371 dids := op.ExtractUniqueDIDs(operations) 372 _, boundaryCIDs := op.GetBoundaryCIDs(operations) 373 374 // Convert boundary CIDs map to slice 375 cidSlice := make([]string, 0, len(boundaryCIDs)) 376 for cid := range boundaryCIDs { 377 cidSlice = append(cidSlice, cid) 378 } 379 380 bundle := &Bundle{ 381 BundleNumber: bundleNumber, 382 StartTime: operations[0].CreatedAt, 383 EndTime: operations[len(operations)-1].CreatedAt, 384 Operations: operations, 385 DIDCount: len(dids), 386 Cursor: cursor, 387 Parent: parent, 388 BoundaryCIDs: cidSlice, 389 Compressed: true, 390 CreatedAt: time.Now().UTC(), 391 } 392 393 return bundle 394} 395 396// ======================================== 397// METADATA CALCULATION 398// ======================================== 399 400// CalculateBundleMetadata calculates complete metadata for a bundle 401// This is the ONE method everyone should use for metadata calculation 402func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, parent string, cursor string) (*BundleMetadata, error) { 403 if len(operations) == 0 { 404 return nil, fmt.Errorf("bundle is empty") 405 } 406 407 // Get file info 408 info, err := os.Stat(path) 409 if err != nil { 410 return nil, fmt.Errorf("failed to stat file: %w", err) 411 } 412 413 // Extract unique DIDs 414 dids := op.ExtractUniqueDIDs(operations) 415 416 // Serialize to JSONL and calculate content hash 417 jsonlData := op.SerializeJSONL(operations) 418 contentSize := int64(len(jsonlData)) 419 contentHash := op.Hash(jsonlData) 420 421 // Read compressed file and calculate compressed hash 422 compressedData, err := os.ReadFile(path) 423 if err != nil { 424 return nil, fmt.Errorf("failed to read compressed file: %w", err) 425 } 426 compressedHash := op.Hash(compressedData) 427 compressedSize := info.Size() 428 429 // Calculate chain hash 430 chainHash := op.CalculateChainHash(parent, contentHash) 431 432 return &BundleMetadata{ 433 BundleNumber: bundleNumber, 434 StartTime: operations[0].CreatedAt, 435 EndTime: operations[len(operations)-1].CreatedAt, 436 OperationCount: len(operations), 437 DIDCount: len(dids), 438 Hash: chainHash, // Chain hash (primary) 439 ContentHash: contentHash, // Content hash 440 Parent: parent, // Parent chain hash 441 CompressedHash: compressedHash, 442 CompressedSize: compressedSize, 443 UncompressedSize: contentSize, 444 Cursor: cursor, 445 CreatedAt: time.Now().UTC(), 446 }, nil 447} 448 449// CalculateBundleMetadataFast calculates metadata quickly without chain hash 450// Used during parallel scanning - chain hash calculated later sequentially 451func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plc.PLCOperation, cursor string) (*BundleMetadata, error) { 452 if len(operations) == 0 { 453 return nil, fmt.Errorf("bundle is empty") 454 } 455 456 // Calculate hashes efficiently (read file once) 457 compressedHash, compressedSize, contentHash, contentSize, err := op.CalculateFileHashes(path) 458 if err != nil { 459 return nil, err 460 } 461 462 // Extract unique DIDs 463 dids := op.ExtractUniqueDIDs(operations) 464 465 // Note: Hash, Parent, and Cursor are set to empty - will be calculated later sequentially 466 return &BundleMetadata{ 467 BundleNumber: bundleNumber, 468 StartTime: operations[0].CreatedAt, 469 EndTime: operations[len(operations)-1].CreatedAt, 470 OperationCount: len(operations), 471 DIDCount: len(dids), 472 Hash: "", // Chain hash - calculated later 473 ContentHash: contentHash, // Content hash 474 Parent: "", // Parent - set later 475 CompressedHash: compressedHash, 476 CompressedSize: compressedSize, 477 UncompressedSize: contentSize, 478 Cursor: cursor, 479 CreatedAt: time.Now().UTC(), 480 }, nil 481}