A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "bufio"
5 "bytes"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "io"
10 "os"
11 "time"
12
13 gozstd "github.com/DataDog/zstd"
14 "github.com/goccy/go-json"
15 "tangled.org/atscan.net/plcbundle/plc"
16)
17
18// Operations handles low-level bundle file operations
19type Operations struct {
20 logger Logger
21}
22
23func NewOperations(logger Logger) (*Operations, error) {
24 return &Operations{logger: logger}, nil
25}
26
27func (op *Operations) Close() {
28 // Nothing to close
29}
30
31// ========================================
32// CORE SERIALIZATION (JSONL)
33// ========================================
34
35// SerializeJSONL serializes operations to newline-delimited JSON
36// This is the ONE method everyone should use for serialization
37func (op *Operations) SerializeJSONL(operations []plc.PLCOperation) []byte {
38 var buf bytes.Buffer
39
40 for _, operation := range operations {
41 // Use RawJSON if available (preserves exact format)
42 if len(operation.RawJSON) > 0 {
43 buf.Write(operation.RawJSON)
44 } else {
45 // Fallback to marshaling
46 data, _ := json.Marshal(operation)
47 buf.Write(data)
48 }
49 buf.WriteByte('\n')
50 }
51
52 return buf.Bytes()
53}
54
55// ParseJSONL parses newline-delimited JSON into operations
56// This is the ONE method everyone should use for deserialization
57func (op *Operations) ParseJSONL(data []byte) ([]plc.PLCOperation, error) {
58 var operations []plc.PLCOperation
59 scanner := bufio.NewScanner(bytes.NewReader(data))
60 buf := make([]byte, 0, 64*1024)
61 scanner.Buffer(buf, 1024*1024)
62
63 for scanner.Scan() {
64 line := scanner.Bytes()
65 if len(line) == 0 {
66 continue
67 }
68
69 var operation plc.PLCOperation
70 if err := json.UnmarshalNoEscape(line, &operation); err != nil {
71 return nil, fmt.Errorf("failed to parse line: %w", err)
72 }
73
74 operation.RawJSON = make([]byte, len(line))
75 copy(operation.RawJSON, line)
76 operations = append(operations, operation)
77 }
78
79 return operations, nil
80}
81
82// ========================================
83// FILE OPERATIONS (uses JSONL + compression)
84// ========================================
85
86// LoadBundle loads a compressed bundle
87func (op *Operations) LoadBundle(path string) ([]plc.PLCOperation, error) {
88 compressed, err := os.ReadFile(path)
89 if err != nil {
90 return nil, fmt.Errorf("failed to read file: %w", err)
91 }
92
93 decompressed, err := gozstd.Decompress(nil, compressed)
94 if err != nil {
95 return nil, fmt.Errorf("failed to decompress: %w", err)
96 }
97
98 return op.ParseJSONL(decompressed)
99}
100
101// SaveBundle saves operations to disk (compressed)
102// Returns: contentHash, compressedHash, contentSize, compressedSize, error
103func (op *Operations) SaveBundle(path string, operations []plc.PLCOperation) (string, string, int64, int64, error) {
104 jsonlData := op.SerializeJSONL(operations)
105 contentSize := int64(len(jsonlData))
106 contentHash := op.Hash(jsonlData)
107
108 // DataDog zstd.Compress returns ([]byte, error)
109 compressed, err := gozstd.Compress(nil, jsonlData)
110 if err != nil {
111 return "", "", 0, 0, fmt.Errorf("failed to compress: %w", err)
112 }
113
114 compressedSize := int64(len(compressed))
115 compressedHash := op.Hash(compressed)
116
117 if err := os.WriteFile(path, compressed, 0644); err != nil {
118 return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err)
119 }
120
121 return contentHash, compressedHash, contentSize, compressedSize, nil
122}
123
124// LoadOperationAtPosition loads a single operation from a bundle without loading the entire bundle
125// This is much more efficient for single-operation lookups
126func (op *Operations) LoadOperationAtPosition(path string, position int) (*plc.PLCOperation, error) {
127 if position < 0 {
128 return nil, fmt.Errorf("invalid position: %d", position)
129 }
130
131 // Open compressed file
132 file, err := os.Open(path)
133 if err != nil {
134 return nil, fmt.Errorf("failed to open file: %w", err)
135 }
136 defer file.Close()
137
138 // Create zstd decompression reader (streaming, no full decompress)
139 reader := gozstd.NewReader(file)
140 defer reader.Close()
141
142 // Use scanner to skip to target line
143 scanner := bufio.NewScanner(reader)
144 buf := make([]byte, 0, 64*1024)
145 scanner.Buffer(buf, 1024*1024)
146
147 lineNum := 0
148 for scanner.Scan() {
149 if lineNum == position {
150 // Found target line!
151 line := scanner.Bytes()
152
153 var operation plc.PLCOperation
154 if err := json.UnmarshalNoEscape(line, &operation); err != nil {
155 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
156 }
157
158 // Store raw JSON
159 operation.RawJSON = make([]byte, len(line))
160 copy(operation.RawJSON, line)
161
162 return &operation, nil
163 }
164 lineNum++
165 }
166
167 if err := scanner.Err(); err != nil {
168 return nil, fmt.Errorf("scanner error: %w", err)
169 }
170
171 return nil, fmt.Errorf("position %d not found (bundle has %d operations)", position, lineNum)
172}
173
174// ========================================
175// STREAMING
176// ========================================
177
178// StreamRaw returns a reader for the raw compressed bundle file
179func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) {
180 file, err := os.Open(path)
181 if err != nil {
182 return nil, fmt.Errorf("failed to open bundle: %w", err)
183 }
184 return file, nil
185}
186
187// StreamDecompressed returns a reader for decompressed bundle data
188func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) {
189 file, err := os.Open(path)
190 if err != nil {
191 return nil, fmt.Errorf("failed to open bundle: %w", err)
192 }
193
194 // Create zstd reader using DataDog's package
195 reader := gozstd.NewReader(file)
196
197 // Return a wrapper that closes both the reader and file
198 return &decompressedReader{
199 reader: reader,
200 file: file,
201 }, nil
202}
203
204// decompressedReader wraps a zstd decoder and underlying file
205type decompressedReader struct {
206 reader io.ReadCloser
207 file *os.File
208}
209
210func (dr *decompressedReader) Read(p []byte) (int, error) {
211 return dr.reader.Read(p)
212}
213
214func (dr *decompressedReader) Close() error {
215 dr.reader.Close()
216 return dr.file.Close()
217}
218
219// ========================================
220// HASHING
221// ========================================
222
223// Hash computes SHA256 hash of data
224func (op *Operations) Hash(data []byte) string {
225 h := sha256.Sum256(data)
226 return hex.EncodeToString(h[:])
227}
228
229// CalculateChainHash calculates the cumulative chain hash
230func (op *Operations) CalculateChainHash(parent string, contentHash string) string {
231 var data string
232 if parent == "" {
233 // Genesis bundle (first bundle)
234 data = "plcbundle:genesis:" + contentHash
235 } else {
236 // Subsequent bundles - chain parent hash with current content
237 data = parent + ":" + contentHash
238 }
239 return op.Hash([]byte(data))
240}
241
242// CalculateFileHashes calculates both content and compressed hashes efficiently
243func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) {
244 // Read compressed file
245 compressedData, err := os.ReadFile(path)
246 if err != nil {
247 return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err)
248 }
249
250 // Calculate compressed hash
251 compressedHash = op.Hash(compressedData)
252 compressedSize = int64(len(compressedData))
253
254 // Decompress with DataDog zstd
255 decompressed, err := gozstd.Decompress(nil, compressedData)
256 if err != nil {
257 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err)
258 }
259
260 // Calculate content hash
261 contentHash = op.Hash(decompressed)
262 contentSize = int64(len(decompressed))
263
264 return compressedHash, compressedSize, contentHash, contentSize, nil
265}
266
267// VerifyHash verifies the hash of a bundle file
268func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) {
269 data, err := os.ReadFile(path)
270 if err != nil {
271 return false, "", fmt.Errorf("failed to read file: %w", err)
272 }
273
274 actualHash := op.Hash(data)
275 return actualHash == expectedHash, actualHash, nil
276}
277
278// ========================================
279// UTILITY FUNCTIONS
280// ========================================
281
282// FileExists checks if a file exists
283func (op *Operations) FileExists(path string) bool {
284 _, err := os.Stat(path)
285 return err == nil
286}
287
288// GetFileSize returns the size of a file
289func (op *Operations) GetFileSize(path string) (int64, error) {
290 info, err := os.Stat(path)
291 if err != nil {
292 return 0, err
293 }
294 return info.Size(), nil
295}
296
297// ExtractUniqueDIDs extracts unique DIDs from operations
298func (op *Operations) ExtractUniqueDIDs(operations []plc.PLCOperation) []string {
299 didSet := make(map[string]bool)
300 for _, operation := range operations {
301 didSet[operation.DID] = true
302 }
303
304 dids := make([]string, 0, len(didSet))
305 for did := range didSet {
306 dids = append(dids, did)
307 }
308
309 return dids
310}
311
312// GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation
313func (op *Operations) GetBoundaryCIDs(operations []plc.PLCOperation) (time.Time, map[string]bool) {
314 if len(operations) == 0 {
315 return time.Time{}, nil
316 }
317
318 lastOp := operations[len(operations)-1]
319 boundaryTime := lastOp.CreatedAt
320 cidSet := make(map[string]bool)
321
322 // Walk backwards from the end
323 for i := len(operations) - 1; i >= 0; i-- {
324 op := operations[i]
325 if op.CreatedAt.Equal(boundaryTime) {
326 cidSet[op.CID] = true
327 } else {
328 break
329 }
330 }
331
332 return boundaryTime, cidSet
333}
334
335// StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs
336func (op *Operations) StripBoundaryDuplicates(operations []plc.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plc.PLCOperation {
337 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 {
338 return operations
339 }
340
341 boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp)
342 if err != nil {
343 return operations
344 }
345
346 startIdx := 0
347 for startIdx < len(operations) {
348 op := operations[startIdx]
349
350 if op.CreatedAt.After(boundaryTime) {
351 break
352 }
353
354 if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] {
355 startIdx++
356 continue
357 }
358
359 break
360 }
361
362 return operations[startIdx:]
363}
364
365// CreateBundle creates a complete bundle structure from operations
366func (op *Operations) CreateBundle(bundleNumber int, operations []plc.PLCOperation, cursor string, parent string) *Bundle {
367 if len(operations) != BUNDLE_SIZE {
368 op.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), BUNDLE_SIZE)
369 }
370
371 dids := op.ExtractUniqueDIDs(operations)
372 _, boundaryCIDs := op.GetBoundaryCIDs(operations)
373
374 // Convert boundary CIDs map to slice
375 cidSlice := make([]string, 0, len(boundaryCIDs))
376 for cid := range boundaryCIDs {
377 cidSlice = append(cidSlice, cid)
378 }
379
380 bundle := &Bundle{
381 BundleNumber: bundleNumber,
382 StartTime: operations[0].CreatedAt,
383 EndTime: operations[len(operations)-1].CreatedAt,
384 Operations: operations,
385 DIDCount: len(dids),
386 Cursor: cursor,
387 Parent: parent,
388 BoundaryCIDs: cidSlice,
389 Compressed: true,
390 CreatedAt: time.Now().UTC(),
391 }
392
393 return bundle
394}
395
396// ========================================
397// METADATA CALCULATION
398// ========================================
399
400// CalculateBundleMetadata calculates complete metadata for a bundle
401// This is the ONE method everyone should use for metadata calculation
402func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, parent string, cursor string) (*BundleMetadata, error) {
403 if len(operations) == 0 {
404 return nil, fmt.Errorf("bundle is empty")
405 }
406
407 // Get file info
408 info, err := os.Stat(path)
409 if err != nil {
410 return nil, fmt.Errorf("failed to stat file: %w", err)
411 }
412
413 // Extract unique DIDs
414 dids := op.ExtractUniqueDIDs(operations)
415
416 // Serialize to JSONL and calculate content hash
417 jsonlData := op.SerializeJSONL(operations)
418 contentSize := int64(len(jsonlData))
419 contentHash := op.Hash(jsonlData)
420
421 // Read compressed file and calculate compressed hash
422 compressedData, err := os.ReadFile(path)
423 if err != nil {
424 return nil, fmt.Errorf("failed to read compressed file: %w", err)
425 }
426 compressedHash := op.Hash(compressedData)
427 compressedSize := info.Size()
428
429 // Calculate chain hash
430 chainHash := op.CalculateChainHash(parent, contentHash)
431
432 return &BundleMetadata{
433 BundleNumber: bundleNumber,
434 StartTime: operations[0].CreatedAt,
435 EndTime: operations[len(operations)-1].CreatedAt,
436 OperationCount: len(operations),
437 DIDCount: len(dids),
438 Hash: chainHash, // Chain hash (primary)
439 ContentHash: contentHash, // Content hash
440 Parent: parent, // Parent chain hash
441 CompressedHash: compressedHash,
442 CompressedSize: compressedSize,
443 UncompressedSize: contentSize,
444 Cursor: cursor,
445 CreatedAt: time.Now().UTC(),
446 }, nil
447}
448
449// CalculateBundleMetadataFast calculates metadata quickly without chain hash
450// Used during parallel scanning - chain hash calculated later sequentially
451func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plc.PLCOperation, cursor string) (*BundleMetadata, error) {
452 if len(operations) == 0 {
453 return nil, fmt.Errorf("bundle is empty")
454 }
455
456 // Calculate hashes efficiently (read file once)
457 compressedHash, compressedSize, contentHash, contentSize, err := op.CalculateFileHashes(path)
458 if err != nil {
459 return nil, err
460 }
461
462 // Extract unique DIDs
463 dids := op.ExtractUniqueDIDs(operations)
464
465 // Note: Hash, Parent, and Cursor are set to empty - will be calculated later sequentially
466 return &BundleMetadata{
467 BundleNumber: bundleNumber,
468 StartTime: operations[0].CreatedAt,
469 EndTime: operations[len(operations)-1].CreatedAt,
470 OperationCount: len(operations),
471 DIDCount: len(dids),
472 Hash: "", // Chain hash - calculated later
473 ContentHash: contentHash, // Content hash
474 Parent: "", // Parent - set later
475 CompressedHash: compressedHash,
476 CompressedSize: compressedSize,
477 UncompressedSize: contentSize,
478 Cursor: cursor,
479 CreatedAt: time.Now().UTC(),
480 }, nil
481}