A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "bufio"
5 "bytes"
6 "crypto/sha256"
7 "encoding/hex"
8 "encoding/json"
9 "fmt"
10 "io"
11 "os"
12 "time"
13
14 "github.com/klauspost/compress/zstd"
15 "tangled.org/atscan.net/plcbundle/plc"
16)
17
18// Operations handles low-level bundle file operations
19type Operations struct {
20 encoder *zstd.Encoder
21 decoder *zstd.Decoder
22 logger Logger
23}
24
25// NewOperations creates a new Operations handler with default compression
26func NewOperations(logger Logger) (*Operations, error) {
27 // Always use default compression (level 3 - good balance)
28 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
29 if err != nil {
30 return nil, fmt.Errorf("failed to create zstd encoder: %w", err)
31 }
32
33 decoder, err := zstd.NewReader(nil)
34 if err != nil {
35 return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
36 }
37
38 return &Operations{
39 encoder: encoder,
40 decoder: decoder,
41 logger: logger,
42 }, nil
43}
44
45// Close cleans up resources
46func (op *Operations) Close() {
47 if op.encoder != nil {
48 op.encoder.Close()
49 }
50 if op.decoder != nil {
51 op.decoder.Close()
52 }
53}
54
55// ========================================
56// CORE SERIALIZATION (JSONL)
57// ========================================
58
59// SerializeJSONL serializes operations to newline-delimited JSON
60// This is the ONE method everyone should use for serialization
61func (op *Operations) SerializeJSONL(operations []plc.PLCOperation) []byte {
62 var buf bytes.Buffer
63
64 for _, operation := range operations {
65 // Use RawJSON if available (preserves exact format)
66 if len(operation.RawJSON) > 0 {
67 buf.Write(operation.RawJSON)
68 } else {
69 // Fallback to marshaling
70 data, _ := json.Marshal(operation)
71 buf.Write(data)
72 }
73 buf.WriteByte('\n')
74 }
75
76 return buf.Bytes()
77}
78
79// ParseJSONL parses newline-delimited JSON into operations
80// This is the ONE method everyone should use for deserialization
81func (op *Operations) ParseJSONL(data []byte) ([]plc.PLCOperation, error) {
82 var operations []plc.PLCOperation
83 scanner := bufio.NewScanner(bytes.NewReader(data))
84
85 // Set a large buffer for long lines
86 buf := make([]byte, 0, 64*1024)
87 scanner.Buffer(buf, 1024*1024)
88
89 lineNum := 0
90 for scanner.Scan() {
91 lineNum++
92 line := scanner.Bytes()
93
94 if len(line) == 0 {
95 continue
96 }
97
98 var operation plc.PLCOperation
99 if err := json.Unmarshal(line, &operation); err != nil {
100 return nil, fmt.Errorf("failed to parse line %d: %w", lineNum, err)
101 }
102
103 // Store raw JSON
104 operation.RawJSON = make([]byte, len(line))
105 copy(operation.RawJSON, line)
106
107 operations = append(operations, operation)
108 }
109
110 if err := scanner.Err(); err != nil {
111 return nil, fmt.Errorf("scanner error: %w", err)
112 }
113
114 return operations, nil
115}
116
117// ========================================
118// FILE OPERATIONS (uses JSONL + compression)
119// ========================================
120
121// LoadBundle loads a compressed bundle from disk
122func (op *Operations) LoadBundle(path string) ([]plc.PLCOperation, error) {
123 // Read compressed file
124 compressed, err := os.ReadFile(path)
125 if err != nil {
126 return nil, fmt.Errorf("failed to read file: %w", err)
127 }
128
129 // Decompress
130 decompressed, err := op.decoder.DecodeAll(compressed, nil)
131 if err != nil {
132 return nil, fmt.Errorf("failed to decompress: %w", err)
133 }
134
135 // Parse JSONL
136 return op.ParseJSONL(decompressed)
137}
138
139// SaveBundle saves operations to disk (compressed)
140// Returns: contentHash, compressedHash, contentSize, compressedSize, error
141func (op *Operations) SaveBundle(path string, operations []plc.PLCOperation) (string, string, int64, int64, error) {
142 // Serialize to JSONL
143 jsonlData := op.SerializeJSONL(operations)
144 contentSize := int64(len(jsonlData))
145 contentHash := op.Hash(jsonlData)
146
147 // Compress
148 compressed := op.encoder.EncodeAll(jsonlData, nil)
149 compressedSize := int64(len(compressed))
150 compressedHash := op.Hash(compressed)
151
152 // Write to file
153 if err := os.WriteFile(path, compressed, 0644); err != nil {
154 return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err)
155 }
156
157 return contentHash, compressedHash, contentSize, compressedSize, nil
158}
159
160// ========================================
161// STREAMING
162// ========================================
163
164// StreamRaw returns a reader for the raw compressed bundle file
165func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) {
166 file, err := os.Open(path)
167 if err != nil {
168 return nil, fmt.Errorf("failed to open bundle: %w", err)
169 }
170 return file, nil
171}
172
173// StreamDecompressed returns a reader for decompressed bundle data
174func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) {
175 file, err := os.Open(path)
176 if err != nil {
177 return nil, fmt.Errorf("failed to open bundle: %w", err)
178 }
179
180 // Create a new decoder for this stream
181 decoder, err := zstd.NewReader(file)
182 if err != nil {
183 file.Close()
184 return nil, fmt.Errorf("failed to create decompressor: %w", err)
185 }
186
187 // Return a wrapper that closes both the decoder and file
188 return &decompressedReader{
189 decoder: decoder,
190 file: file,
191 }, nil
192}
193
194// decompressedReader wraps a zstd decoder and underlying file
195type decompressedReader struct {
196 decoder *zstd.Decoder
197 file *os.File
198}
199
200func (dr *decompressedReader) Read(p []byte) (int, error) {
201 return dr.decoder.Read(p)
202}
203
204func (dr *decompressedReader) Close() error {
205 dr.decoder.Close()
206 return dr.file.Close()
207}
208
209// ========================================
210// HASHING
211// ========================================
212
213// Hash computes SHA256 hash of data
214func (op *Operations) Hash(data []byte) string {
215 h := sha256.Sum256(data)
216 return hex.EncodeToString(h[:])
217}
218
219// CalculateChainHash calculates the cumulative chain hash
220func (op *Operations) CalculateChainHash(parent string, contentHash string) string {
221 var data string
222 if parent == "" {
223 // Genesis bundle (first bundle)
224 data = "plcbundle:genesis:" + contentHash
225 } else {
226 // Subsequent bundles - chain parent hash with current content
227 data = parent + ":" + contentHash
228 }
229 return op.Hash([]byte(data))
230}
231
232// CalculateFileHashes calculates both content and compressed hashes efficiently
233func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) {
234 // Read compressed file
235 compressedData, err := os.ReadFile(path)
236 if err != nil {
237 return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err)
238 }
239
240 // Calculate compressed hash
241 compressedHash = op.Hash(compressedData)
242 compressedSize = int64(len(compressedData))
243
244 // Decompress
245 decompressed, err := op.decoder.DecodeAll(compressedData, nil)
246 if err != nil {
247 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err)
248 }
249
250 // Calculate content hash
251 contentHash = op.Hash(decompressed)
252 contentSize = int64(len(decompressed))
253
254 return compressedHash, compressedSize, contentHash, contentSize, nil
255}
256
257// VerifyHash verifies the hash of a bundle file
258func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) {
259 data, err := os.ReadFile(path)
260 if err != nil {
261 return false, "", fmt.Errorf("failed to read file: %w", err)
262 }
263
264 actualHash := op.Hash(data)
265 return actualHash == expectedHash, actualHash, nil
266}
267
268// ========================================
269// UTILITY FUNCTIONS
270// ========================================
271
272// FileExists checks if a file exists
273func (op *Operations) FileExists(path string) bool {
274 _, err := os.Stat(path)
275 return err == nil
276}
277
278// GetFileSize returns the size of a file
279func (op *Operations) GetFileSize(path string) (int64, error) {
280 info, err := os.Stat(path)
281 if err != nil {
282 return 0, err
283 }
284 return info.Size(), nil
285}
286
287// ExtractUniqueDIDs extracts unique DIDs from operations
288func (op *Operations) ExtractUniqueDIDs(operations []plc.PLCOperation) []string {
289 didSet := make(map[string]bool)
290 for _, operation := range operations {
291 didSet[operation.DID] = true
292 }
293
294 dids := make([]string, 0, len(didSet))
295 for did := range didSet {
296 dids = append(dids, did)
297 }
298
299 return dids
300}
301
302// GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation
303func (op *Operations) GetBoundaryCIDs(operations []plc.PLCOperation) (time.Time, map[string]bool) {
304 if len(operations) == 0 {
305 return time.Time{}, nil
306 }
307
308 lastOp := operations[len(operations)-1]
309 boundaryTime := lastOp.CreatedAt
310 cidSet := make(map[string]bool)
311
312 // Walk backwards from the end
313 for i := len(operations) - 1; i >= 0; i-- {
314 op := operations[i]
315 if op.CreatedAt.Equal(boundaryTime) {
316 cidSet[op.CID] = true
317 } else {
318 break
319 }
320 }
321
322 return boundaryTime, cidSet
323}
324
325// StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs
326func (op *Operations) StripBoundaryDuplicates(operations []plc.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plc.PLCOperation {
327 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 {
328 return operations
329 }
330
331 boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp)
332 if err != nil {
333 return operations
334 }
335
336 startIdx := 0
337 for startIdx < len(operations) {
338 op := operations[startIdx]
339
340 if op.CreatedAt.After(boundaryTime) {
341 break
342 }
343
344 if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] {
345 startIdx++
346 continue
347 }
348
349 break
350 }
351
352 return operations[startIdx:]
353}
354
355// CreateBundle creates a complete bundle structure from operations
356func (op *Operations) CreateBundle(bundleNumber int, operations []plc.PLCOperation, cursor string, parent string) *Bundle {
357 if len(operations) != BUNDLE_SIZE {
358 op.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), BUNDLE_SIZE)
359 }
360
361 dids := op.ExtractUniqueDIDs(operations)
362 _, boundaryCIDs := op.GetBoundaryCIDs(operations)
363
364 // Convert boundary CIDs map to slice
365 cidSlice := make([]string, 0, len(boundaryCIDs))
366 for cid := range boundaryCIDs {
367 cidSlice = append(cidSlice, cid)
368 }
369
370 bundle := &Bundle{
371 BundleNumber: bundleNumber,
372 StartTime: operations[0].CreatedAt,
373 EndTime: operations[len(operations)-1].CreatedAt,
374 Operations: operations,
375 DIDCount: len(dids),
376 Cursor: cursor,
377 Parent: parent,
378 BoundaryCIDs: cidSlice,
379 Compressed: true,
380 CreatedAt: time.Now().UTC(),
381 }
382
383 return bundle
384}
385
386// ========================================
387// METADATA CALCULATION
388// ========================================
389
390// CalculateBundleMetadata calculates complete metadata for a bundle
391// This is the ONE method everyone should use for metadata calculation
392func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, parent string, cursor string) (*BundleMetadata, error) {
393 if len(operations) == 0 {
394 return nil, fmt.Errorf("bundle is empty")
395 }
396
397 // Get file info
398 info, err := os.Stat(path)
399 if err != nil {
400 return nil, fmt.Errorf("failed to stat file: %w", err)
401 }
402
403 // Extract unique DIDs
404 dids := op.ExtractUniqueDIDs(operations)
405
406 // Serialize to JSONL and calculate content hash
407 jsonlData := op.SerializeJSONL(operations)
408 contentSize := int64(len(jsonlData))
409 contentHash := op.Hash(jsonlData)
410
411 // Read compressed file and calculate compressed hash
412 compressedData, err := os.ReadFile(path)
413 if err != nil {
414 return nil, fmt.Errorf("failed to read compressed file: %w", err)
415 }
416 compressedHash := op.Hash(compressedData)
417 compressedSize := info.Size()
418
419 // Calculate chain hash
420 chainHash := op.CalculateChainHash(parent, contentHash)
421
422 return &BundleMetadata{
423 BundleNumber: bundleNumber,
424 StartTime: operations[0].CreatedAt,
425 EndTime: operations[len(operations)-1].CreatedAt,
426 OperationCount: len(operations),
427 DIDCount: len(dids),
428 Hash: chainHash, // Chain hash (primary)
429 ContentHash: contentHash, // Content hash
430 Parent: parent, // Parent chain hash
431 CompressedHash: compressedHash,
432 CompressedSize: compressedSize,
433 UncompressedSize: contentSize,
434 Cursor: cursor,
435 CreatedAt: time.Now().UTC(),
436 }, nil
437}
438
439// CalculateBundleMetadataFast calculates metadata quickly without chain hash
440// Used during parallel scanning - chain hash calculated later sequentially
441func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plc.PLCOperation, cursor string) (*BundleMetadata, error) {
442 if len(operations) == 0 {
443 return nil, fmt.Errorf("bundle is empty")
444 }
445
446 // Calculate hashes efficiently (read file once)
447 compressedHash, compressedSize, contentHash, contentSize, err := op.CalculateFileHashes(path)
448 if err != nil {
449 return nil, err
450 }
451
452 // Extract unique DIDs
453 dids := op.ExtractUniqueDIDs(operations)
454
455 // Note: Hash, Parent, and Cursor are set to empty - will be calculated later sequentially
456 return &BundleMetadata{
457 BundleNumber: bundleNumber,
458 StartTime: operations[0].CreatedAt,
459 EndTime: operations[len(operations)-1].CreatedAt,
460 OperationCount: len(operations),
461 DIDCount: len(dids),
462 Hash: "", // Chain hash - calculated later
463 ContentHash: contentHash, // Content hash
464 Parent: "", // Parent - set later
465 CompressedHash: compressedHash,
466 CompressedSize: compressedSize,
467 UncompressedSize: contentSize,
468 Cursor: cursor,
469 CreatedAt: time.Now().UTC(),
470 }, nil
471}