+158
-18
internal/storage/storage.go
+158
-18
internal/storage/storage.go
···
8
8
"fmt"
9
9
"io"
10
10
"os"
11
+
"path/filepath"
11
12
"sync"
12
13
"time"
13
14
···
88
89
89
90
// LoadBundle loads a compressed bundle
90
91
func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) {
92
+
// 1. Read the entire compressed file into memory.
91
93
compressed, err := os.ReadFile(path)
92
94
if err != nil {
93
95
return nil, fmt.Errorf("failed to read file: %w", err)
94
96
}
95
97
98
+
// This is the key: The one-shot Decompress function is designed to correctly
99
+
// handle a byte slice containing one or more concatenated frames.
96
100
decompressed, err := gozstd.Decompress(nil, compressed)
97
101
if err != nil {
98
102
return nil, fmt.Errorf("failed to decompress: %w", err)
99
103
}
100
104
105
+
// 3. Parse the fully decompressed JSONL data.
101
106
return op.ParseJSONL(decompressed)
102
107
}
103
108
104
109
// SaveBundle saves operations to disk (compressed)
105
-
// Returns: contentHash, compressedHash, contentSize, compressedSize, error
106
110
func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation) (string, string, int64, int64, error) {
111
+
// 1. Serialize all operations once to get a single, consistent content hash.
112
+
// This is critical for preserving chain hash integrity.
107
113
jsonlData := op.SerializeJSONL(operations)
108
114
contentSize := int64(len(jsonlData))
109
115
contentHash := op.Hash(jsonlData)
110
116
111
-
compressed, err := gozstd.Compress(nil, jsonlData)
117
+
// --- Correct Multi-Frame Streaming Logic ---
118
+
119
+
// 2. Create the destination file.
120
+
bundleFile, err := os.Create(path)
112
121
if err != nil {
113
-
return "", "", 0, 0, fmt.Errorf("failed to compress: %w", err)
122
+
return "", "", 0, 0, fmt.Errorf("could not create bundle file: %w", err)
114
123
}
124
+
defer bundleFile.Close() // Ensure the file is closed on exit.
115
125
116
-
compressedSize := int64(len(compressed))
117
-
compressedHash := op.Hash(compressed)
126
+
frameSize := 100 // Each frame will contain 100 operations.
127
+
frameOffsets := []int64{0} // The first frame always starts at offset 0.
128
+
129
+
// 3. Loop through operations in chunks.
130
+
for i := 0; i < len(operations); i += frameSize {
131
+
end := i + frameSize
132
+
if end > len(operations) {
133
+
end = len(operations)
134
+
}
135
+
opChunk := operations[i:end]
136
+
chunkJsonlData := op.SerializeJSONL(opChunk)
137
+
138
+
// a. Create a NEW zstd writer FOR EACH CHUNK. This is the key.
139
+
zstdWriter := gozstd.NewWriter(bundleFile)
140
+
141
+
// b. Write the uncompressed chunk to the zstd writer.
142
+
_, err := zstdWriter.Write(chunkJsonlData)
143
+
if err != nil {
144
+
zstdWriter.Close() // Attempt to clean up
145
+
return "", "", 0, 0, fmt.Errorf("failed to write frame data: %w", err)
146
+
}
118
147
119
-
if err := os.WriteFile(path, compressed, 0644); err != nil {
120
-
return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err)
148
+
// c. Close the zstd writer. This finalizes the frame and flushes it
149
+
// to the underlying file. It does NOT close the bundleFile itself.
150
+
if err := zstdWriter.Close(); err != nil {
151
+
return "", "", 0, 0, fmt.Errorf("failed to close/finalize frame: %w", err)
152
+
}
153
+
154
+
// d. After closing the frame, get the file's new total size.
155
+
currentOffset, err := bundleFile.Seek(0, io.SeekCurrent)
156
+
if err != nil {
157
+
return "", "", 0, 0, fmt.Errorf("failed to get file offset: %w", err)
158
+
}
159
+
160
+
// e. Record this offset as the start of the next frame.
161
+
if end < len(operations) {
162
+
frameOffsets = append(frameOffsets, currentOffset)
163
+
}
121
164
}
122
165
123
-
return contentHash, compressedHash, contentSize, compressedSize, nil
166
+
// 4. Get the final total file size. This is the end of the last frame.
167
+
finalSize, _ := bundleFile.Seek(0, io.SeekCurrent)
168
+
frameOffsets = append(frameOffsets, finalSize)
169
+
170
+
// 5. Save the companion frame-offset index file.
171
+
indexPath := path + ".idx"
172
+
indexData, _ := json.Marshal(frameOffsets)
173
+
if err := os.WriteFile(indexPath, indexData, 0644); err != nil {
174
+
os.Remove(path) // Clean up to avoid inconsistent state.
175
+
return "", "", 0, 0, fmt.Errorf("failed to write frame index: %w", err)
176
+
}
177
+
178
+
// 6. Re-read the full compressed file to get its final hash for the main index.
179
+
compressedData, err := os.ReadFile(path)
180
+
if err != nil {
181
+
return "", "", 0, 0, fmt.Errorf("failed to re-read bundle for hashing: %w", err)
182
+
}
183
+
compressedHash := op.Hash(compressedData)
184
+
185
+
return contentHash, compressedHash, contentSize, finalSize, nil
124
186
}
125
187
126
188
// Pool for scanner buffers
···
137
199
return nil, fmt.Errorf("invalid position: %d", position)
138
200
}
139
201
202
+
frameSize := 100 // Must match the frame size used in SaveBundle
203
+
indexPath := path + ".idx"
204
+
205
+
// 1. Load the frame offset index.
206
+
indexData, err := os.ReadFile(indexPath)
207
+
if err != nil {
208
+
// If the frame index doesn't exist, fall back to the legacy full-scan method.
209
+
// This ensures backward compatibility with your old bundle files during migration.
210
+
if os.IsNotExist(err) {
211
+
op.logger.Printf("DEBUG: Frame index not found for %s, falling back to legacy full scan.", filepath.Base(path))
212
+
return op.loadOperationAtPositionLegacy(path, position)
213
+
}
214
+
return nil, fmt.Errorf("could not read frame index %s: %w", indexPath, err)
215
+
}
216
+
217
+
var frameOffsets []int64
218
+
if err := json.Unmarshal(indexData, &frameOffsets); err != nil {
219
+
return nil, fmt.Errorf("could not parse frame index %s: %w", indexPath, err)
220
+
}
221
+
222
+
// 2. Calculate target frame and the line number within that frame.
223
+
frameIndex := position / frameSize
224
+
lineInFrame := position % frameSize
225
+
226
+
if frameIndex >= len(frameOffsets)-1 {
227
+
return nil, fmt.Errorf("position %d is out of bounds for bundle with %d frames", position, len(frameOffsets)-1)
228
+
}
229
+
230
+
// 3. Get frame boundaries from the index.
231
+
startOffset := frameOffsets[frameIndex]
232
+
endOffset := frameOffsets[frameIndex+1]
233
+
frameLength := endOffset - startOffset
234
+
235
+
if frameLength <= 0 {
236
+
return nil, fmt.Errorf("invalid frame length calculated for position %d", position)
237
+
}
238
+
239
+
// 4. Open the bundle file.
240
+
bundleFile, err := os.Open(path)
241
+
if err != nil {
242
+
return nil, err
243
+
}
244
+
defer bundleFile.Close()
245
+
246
+
// 5. Read ONLY the bytes for that single frame from the correct offset.
247
+
compressedFrame := make([]byte, frameLength)
248
+
_, err = bundleFile.ReadAt(compressedFrame, startOffset)
249
+
if err != nil {
250
+
return nil, fmt.Errorf("failed to read frame %d from bundle: %w", frameIndex, err)
251
+
}
252
+
253
+
// 6. Decompress just that small frame.
254
+
decompressed, err := gozstd.Decompress(nil, compressedFrame)
255
+
if err != nil {
256
+
return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err)
257
+
}
258
+
259
+
// 7. Scan the ~100 lines to get the target operation.
260
+
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
261
+
lineNum := 0
262
+
for scanner.Scan() {
263
+
if lineNum == lineInFrame {
264
+
line := scanner.Bytes()
265
+
var operation plcclient.PLCOperation
266
+
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
267
+
return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
268
+
}
269
+
operation.RawJSON = make([]byte, len(line))
270
+
copy(operation.RawJSON, line)
271
+
return &operation, nil
272
+
}
273
+
lineNum++
274
+
}
275
+
276
+
if err := scanner.Err(); err != nil {
277
+
return nil, fmt.Errorf("scanner error on frame %d: %w", frameIndex, err)
278
+
}
279
+
280
+
return nil, fmt.Errorf("operation at position %d not found", position)
281
+
}
282
+
283
+
func (op *Operations) loadOperationAtPositionLegacy(path string, position int) (*plcclient.PLCOperation, error) {
140
284
file, err := os.Open(path)
141
285
if err != nil {
142
286
return nil, fmt.Errorf("failed to open file: %w", err)
···
146
290
reader := gozstd.NewReader(file)
147
291
defer reader.Close()
148
292
149
-
bufPtr := scannerBufPool.Get().(*[]byte)
150
-
defer scannerBufPool.Put(bufPtr)
151
-
152
293
scanner := bufio.NewScanner(reader)
153
-
scanner.Buffer(*bufPtr, 512*1024)
294
+
// Use a larger buffer for potentially large lines
295
+
buf := make([]byte, 512*1024)
296
+
scanner.Buffer(buf, 1024*1024)
154
297
155
298
lineNum := 0
156
299
for scanner.Scan() {
157
300
if lineNum == position {
158
301
line := scanner.Bytes()
159
-
160
302
var operation plcclient.PLCOperation
161
303
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
162
-
return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
304
+
return nil, fmt.Errorf("failed to parse legacy operation at position %d: %w", position, err)
163
305
}
164
-
165
306
operation.RawJSON = make([]byte, len(line))
166
307
copy(operation.RawJSON, line)
167
-
168
308
return &operation, nil
169
309
}
170
310
lineNum++
171
311
}
172
312
173
313
if err := scanner.Err(); err != nil {
174
-
return nil, fmt.Errorf("scanner error: %w", err)
314
+
return nil, fmt.Errorf("legacy scanner error: %w", err)
175
315
}
176
316
177
-
return nil, fmt.Errorf("position %d not found", position)
317
+
return nil, fmt.Errorf("position %d not found in legacy bundle", position)
178
318
}
179
319
180
320
// ========================================