A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto/sha256"
8 "encoding/binary"
9 "errors"
10 "fmt"
11 "io"
12 "os"
13 "path/filepath"
14 "slices"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 abcitypes "github.com/cometbft/cometbft/abci/types"
21 "github.com/cosmos/iavl"
22 "github.com/klauspost/compress/zstd"
23 "github.com/palantir/stacktrace"
24)
25
26const snapshotChunkSize = 10 * 1024 * 1024 // 10 MB
27const snapshotChunkHashSize = 32
28
29// ListSnapshots implements [types.Application].
30func (d *DIDPLCApplication) ListSnapshots(context.Context, *abcitypes.RequestListSnapshots) (*abcitypes.ResponseListSnapshots, error) {
31 files, err := filepath.Glob(filepath.Join(d.snapshotDirectory, "*.snapshot"))
32 if err != nil {
33 return nil, stacktrace.Propagate(err, "")
34 }
35
36 snapshots := make([]*abcitypes.Snapshot, 0, len(files))
37 for _, filename := range files {
38 s, err := readSnapshotMetadata(filename)
39 if err != nil {
40 return nil, stacktrace.Propagate(err, "")
41 }
42
43 snapshots = append(snapshots, s)
44 }
45
46 return &abcitypes.ResponseListSnapshots{
47 Snapshots: snapshots,
48 }, nil
49}
50
51func readSnapshotMetadata(filename string) (*abcitypes.Snapshot, error) {
52 // Extract height from filename pattern: %020d.snapshot
53 base := filepath.Base(filename)
54 if !strings.HasSuffix(base, ".snapshot") {
55 return nil, stacktrace.NewError("invalid snapshot filename format: %s", filename)
56 }
57 heightStr := strings.TrimSuffix(base, ".snapshot")
58 height, err := strconv.ParseInt(heightStr, 10, 64)
59 if err != nil {
60 return nil, stacktrace.Propagate(err, "failed to parse height from filename: %s", filename)
61 }
62
63 // Open and read snapshot file header
64 f, err := os.Open(filename)
65 if err != nil {
66 return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", filename)
67 }
68 defer f.Close()
69
70 // Read file magic (18 bytes)
71 magic := make([]byte, 18)
72 _, err = io.ReadFull(f, magic)
73 if err != nil {
74 return nil, stacktrace.Propagate(err, "failed to read file magic")
75 }
76 if string(magic) != "didplcbft-snapshot" {
77 return nil, stacktrace.NewError("invalid file magic: expected 'didplcbft-snapshot', got '%s'", string(magic))
78 }
79
80 // Read version bytes (6 bytes)
81 versionBytes := make([]byte, 6)
82 _, err = io.ReadFull(f, versionBytes)
83 if err != nil {
84 return nil, stacktrace.Propagate(err, "failed to read version bytes")
85 }
86 format := binary.BigEndian.Uint32(versionBytes[2:])
87
88 // Read height (8 bytes, big-endian)
89 heightBytes := make([]byte, 8)
90 _, err = io.ReadFull(f, heightBytes)
91 if err != nil {
92 return nil, stacktrace.Propagate(err, "failed to read height")
93 }
94 fileHeight := int64(binary.BigEndian.Uint64(heightBytes))
95 if fileHeight != height {
96 return nil, stacktrace.NewError("height mismatch: filename indicates %d, file header contains %d", height, fileHeight)
97 }
98
99 // Read tree hash (32 bytes)
100 hash := make([]byte, 32)
101 _, err = io.ReadFull(f, hash)
102 if err != nil {
103 return nil, stacktrace.Propagate(err, "failed to read tree hash")
104 }
105
106 // Read corresponding chunksums file
107 chunksumsFilename := strings.TrimSuffix(filename, ".snapshot") + ".chunksums"
108 chunksumsData, err := os.ReadFile(chunksumsFilename)
109 if err != nil {
110 return nil, stacktrace.Propagate(err, "failed to read chunksums file: %s", chunksumsFilename)
111 }
112
113 // Calculate number of chunks (each chunk hash is 32 bytes)
114 chunks := int64(len(chunksumsData)) / snapshotChunkHashSize
115
116 return &abcitypes.Snapshot{
117 Height: uint64(height),
118 Format: format,
119 Chunks: uint32(chunks),
120 Hash: hash,
121 Metadata: chunksumsData,
122 }, nil
123}
124
125// LoadSnapshotChunk implements [types.Application].
126func (d *DIDPLCApplication) LoadSnapshotChunk(_ context.Context, req *abcitypes.RequestLoadSnapshotChunk) (*abcitypes.ResponseLoadSnapshotChunk, error) {
127 if req.Format != 1 {
128 // just in case CometBFT asks us to load a chunk of a format we didn't declare to support in ListSnapshots...
129 return nil, stacktrace.NewError("unsupported snapshot format")
130 }
131
132 // Construct filename from height using the same pattern as createSnapshot
133 snapshotFilename := filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", req.Height))
134
135 // Open the snapshot file
136 f, err := os.Open(snapshotFilename)
137 if err != nil {
138 return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", snapshotFilename)
139 }
140 defer f.Close()
141
142 // Calculate the offset for the requested chunk (start from beginning of file, including header)
143 offset := int64(req.Chunk) * snapshotChunkSize
144 _, err = f.Seek(offset, io.SeekStart)
145 if err != nil {
146 return nil, stacktrace.Propagate(err, "failed to seek to chunk offset")
147 }
148
149 // Read up to snapshotChunkSize bytes
150 chunkData := make([]byte, snapshotChunkSize)
151 n, err := f.Read(chunkData)
152 if err != nil && err != io.EOF {
153 return nil, stacktrace.Propagate(err, "failed to read chunk data")
154 }
155
156 // If we read less than snapshotChunkSize, trim the slice
157 if n < snapshotChunkSize {
158 chunkData = chunkData[:n]
159 }
160
161 return &abcitypes.ResponseLoadSnapshotChunk{
162 Chunk: chunkData,
163 }, nil
164}
165
166// ApplySnapshotChunk implements [types.Application].
167func (d *DIDPLCApplication) ApplySnapshotChunk(_ context.Context, req *abcitypes.RequestApplySnapshotChunk) (*abcitypes.ResponseApplySnapshotChunk, error) {
168 if d.snapshotApplier == nil {
169 return nil, stacktrace.NewError("snapshot not offered yet, can't apply chunk")
170 }
171
172 err := d.snapshotApplier.Apply(int(req.Index), req.Chunk)
173 if err != nil {
174 if errors.Is(err, errMalformedChunk) {
175 return &abcitypes.ResponseApplySnapshotChunk{
176 Result: abcitypes.ResponseApplySnapshotChunk_RETRY,
177 RefetchChunks: []uint32{req.Index},
178 RejectSenders: []string{req.Sender},
179 }, nil
180 } else if errors.Is(err, errTreeHashMismatch) {
181 return &abcitypes.ResponseApplySnapshotChunk{
182 Result: abcitypes.ResponseApplySnapshotChunk_REJECT_SNAPSHOT,
183 RejectSenders: []string{req.Sender},
184 }, nil
185 }
186 return nil, stacktrace.NewError("failed to apply")
187 }
188
189 if d.snapshotApplier.Done() {
190 d.snapshotApplier = nil
191 err := d.plc.RefreshTreeData()
192 if err != nil {
193 return nil, stacktrace.Propagate(err, "")
194 }
195 }
196
197 return &abcitypes.ResponseApplySnapshotChunk{
198 Result: abcitypes.ResponseApplySnapshotChunk_ACCEPT,
199 }, nil
200}
201
202// OfferSnapshot implements [types.Application].
203func (d *DIDPLCApplication) OfferSnapshot(_ context.Context, req *abcitypes.RequestOfferSnapshot) (*abcitypes.ResponseOfferSnapshot, error) {
204 if d.snapshotApplier != nil {
205 err := d.snapshotApplier.Abort()
206 if err != nil {
207 return nil, stacktrace.Propagate(err, "")
208 }
209 }
210
211 if req.Snapshot.Format != 1 {
212 return &abcitypes.ResponseOfferSnapshot{
213 Result: abcitypes.ResponseOfferSnapshot_REJECT_FORMAT,
214 }, nil
215 }
216
217 var err error
218 d.snapshotApplier, err = d.beginApplyingSnapshot(int64(req.Snapshot.Height), req.AppHash, int(req.Snapshot.Chunks), req.Snapshot.Metadata)
219 if err != nil {
220 d.snapshotApplier = nil
221 if errors.Is(err, errInvalidMetadata) {
222 return &abcitypes.ResponseOfferSnapshot{
223 Result: abcitypes.ResponseOfferSnapshot_REJECT_SENDER,
224 }, nil
225 }
226 return nil, stacktrace.Propagate(err, "")
227 }
228
229 return &abcitypes.ResponseOfferSnapshot{
230 Result: abcitypes.ResponseOfferSnapshot_ACCEPT,
231 }, nil
232}
233
234func (d *DIDPLCApplication) createSnapshot(treeVersion int64, tempFilename string) error {
235 it, err := d.tree.GetImmutable(treeVersion)
236 if err != nil {
237 return stacktrace.Propagate(err, "")
238 }
239
240 // Delete tempFilename if it exists to ensure a fresh file is created
241 _ = os.Remove(tempFilename)
242
243 f, err := os.Create(tempFilename)
244 if err != nil {
245 return stacktrace.Propagate(err, "")
246 }
247 defer f.Close()
248
249 st := time.Now()
250
251 err = writeSnapshot(f, it)
252 if err != nil {
253 return stacktrace.Propagate(err, "")
254 }
255
256 err = f.Sync()
257 if err != nil {
258 return stacktrace.Propagate(err, "")
259 }
260
261 hf, err := os.Create(filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.chunksums", treeVersion)))
262 if err != nil {
263 return stacktrace.Propagate(err, "")
264 }
265 defer hf.Close()
266
267 err = writeChunkHashes(f, hf)
268 if err != nil {
269 return stacktrace.Propagate(err, "")
270 }
271
272 err = hf.Sync()
273 if err != nil {
274 return stacktrace.Propagate(err, "")
275 }
276
277 err = f.Close()
278 if err != nil {
279 return stacktrace.Propagate(err, "")
280 }
281
282 os.Rename(tempFilename, filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", treeVersion)))
283
284 fmt.Println("Took", time.Since(st), "to export")
285
286 return nil
287}
288
289func writeSnapshot(writerSeeker io.WriteSeeker, it *iavl.ImmutableTree) error {
290 writtenUntilReservedFields := 0
291
292 bw := bufio.NewWriter(writerSeeker)
293
294 // file magic and version
295 c, err := bw.Write([]byte("didplcbft-snapshot"))
296 if err != nil {
297 return stacktrace.Propagate(err, "")
298 }
299 writtenUntilReservedFields += c
300
301 c, err = bw.Write([]byte{0, 0, 0, 0, 0, 1})
302 if err != nil {
303 return stacktrace.Propagate(err, "")
304 }
305 writtenUntilReservedFields += c
306
307 b := make([]byte, 8)
308 binary.BigEndian.PutUint64(b, uint64(it.Version()))
309 c, err = bw.Write(b)
310 if err != nil {
311 return stacktrace.Propagate(err, "")
312 }
313 writtenUntilReservedFields += c
314
315 c, err = bw.Write(it.Hash())
316 if err != nil {
317 return stacktrace.Propagate(err, "")
318 }
319 writtenUntilReservedFields += c
320
321 // reserve space for writing number of bytes, number of nodes
322 // 8 bytes for node list size in bytes
323 // 8 bytes for number of nodes
324 sizeOfReservedFields := 8 + 8
325 b = make([]byte, sizeOfReservedFields)
326 _, err = bw.Write(b)
327 if err != nil {
328 return stacktrace.Propagate(err, "")
329 }
330
331 zstdw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
332 if err != nil {
333 return stacktrace.Propagate(err, "")
334 }
335
336 numNodes, err := exportNodes(it, zstdw)
337 if err != nil {
338 return stacktrace.Propagate(err, "")
339 }
340
341 err = zstdw.Close()
342 if err != nil {
343 return stacktrace.Propagate(err, "")
344 }
345
346 err = bw.Flush()
347 if err != nil {
348 return stacktrace.Propagate(err, "")
349 }
350
351 // find total compressed node list file size
352 offset, err := writerSeeker.Seek(0, io.SeekCurrent)
353 if err != nil {
354 return stacktrace.Propagate(err, "")
355 }
356 compressedNodeListSize := offset - int64(writtenUntilReservedFields) - int64(sizeOfReservedFields)
357
358 // seek back and write empty header fields
359
360 offset, err = writerSeeker.Seek(int64(writtenUntilReservedFields), io.SeekStart)
361 if err != nil {
362 return stacktrace.Propagate(err, "")
363 }
364 if offset != int64(writtenUntilReservedFields) {
365 return stacktrace.NewError("unexpected seek result")
366 }
367
368 b = make([]byte, sizeOfReservedFields)
369 binary.BigEndian.PutUint64(b, uint64(compressedNodeListSize))
370 binary.BigEndian.PutUint64(b[8:], uint64(numNodes))
371 _, err = writerSeeker.Write(b)
372 if err != nil {
373 return stacktrace.Propagate(err, "")
374 }
375
376 return nil
377}
378
379func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) {
380 exporter, err := it.Export()
381 if err != nil {
382 return 0, stacktrace.Propagate(err, "")
383 }
384 defer exporter.Close()
385 cexporter := iavl.NewCompressExporter(exporter)
386
387 numNodes := int64(0)
388 for {
389 node, err := cexporter.Next()
390 if errors.Is(err, iavl.ErrorExportDone) {
391 break
392 }
393 if err != nil {
394 return 0, stacktrace.Propagate(err, "")
395 }
396
397 b := make([]byte, 9)
398 b[0] = byte(node.Height)
399
400 binary.BigEndian.PutUint64(b[1:], uint64(node.Version))
401 _, err = w.Write(b)
402 if err != nil {
403 return 0, stacktrace.Propagate(err, "")
404 }
405
406 // nil node values are different from 0-byte values
407 b = []byte{0xff, 0xff, 0xff, 0xff}
408 if node.Key != nil {
409 binary.BigEndian.PutUint32(b, uint32(len(node.Key)))
410 }
411 _, err = w.Write(b)
412 if err != nil {
413 return 0, stacktrace.Propagate(err, "")
414 }
415
416 b = []byte{0xff, 0xff, 0xff, 0xff}
417 if node.Value != nil {
418 binary.BigEndian.PutUint32(b, uint32(len(node.Value)))
419 }
420 _, err = w.Write(b)
421 if err != nil {
422 return 0, stacktrace.Propagate(err, "")
423 }
424
425 _, err = w.Write(node.Key)
426 if err != nil {
427 return 0, stacktrace.Propagate(err, "")
428 }
429
430 _, err = w.Write(node.Value)
431 if err != nil {
432 return 0, stacktrace.Propagate(err, "")
433 }
434 numNodes++
435 }
436
437 return numNodes, nil
438}
439
440func writeChunkHashes(snapshotFile io.ReadSeeker, w io.Writer) error {
441 bw := bufio.NewWriter(w)
442 defer bw.Flush()
443
444 _, err := snapshotFile.Seek(0, io.SeekStart)
445 if err != nil {
446 return stacktrace.Propagate(err, "")
447 }
448
449 buf := make([]byte, snapshotChunkSize)
450 for {
451 n, err := io.ReadFull(snapshotFile, buf)
452 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
453 return stacktrace.Propagate(err, "")
454 }
455 if n == 0 {
456 break
457 }
458
459 hash := sha256.Sum256(buf[:n])
460 c, err := w.Write(hash[:])
461 if err != nil {
462 return stacktrace.Propagate(err, "")
463 }
464
465 if c != snapshotChunkHashSize {
466 return stacktrace.NewError("unexpected chunk hash size")
467 }
468
469 if n < snapshotChunkSize {
470 break
471 }
472 }
473
474 return nil
475}
476
477type snapshotApplier struct {
478 tree *iavl.MutableTree
479 treeVersion int64
480 expectedFinalHash []byte
481 expectedChunkHashes [][]byte
482
483 pipeWriter *io.PipeWriter
484 pipeReader *io.PipeReader
485 zstdReader io.ReadCloser
486
487 importer *iavl.Importer
488 compressImporter iavl.NodeImporter
489 importerWg sync.WaitGroup
490
491 numImportedNodes int
492 claimedNodeCount int
493 done bool
494}
495
496var errMalformedChunk = errors.New("malformed chunk")
497var errInvalidMetadata = errors.New("invalid metadata")
498var errTreeHashMismatch = errors.New("tree hash mismatch")
499
500func (d *DIDPLCApplication) beginApplyingSnapshot(treeVersion int64, expectedFinalHash []byte, expectedNumChunks int, chunksums []byte) (*snapshotApplier, error) {
501 if len(chunksums)%snapshotChunkHashSize != 0 || len(chunksums)/snapshotChunkHashSize != expectedNumChunks {
502 return nil, stacktrace.Propagate(errInvalidMetadata, "")
503 }
504
505 if !d.tree.IsEmpty() {
506 err := d.fullyClearTree()
507 if err != nil {
508 return nil, stacktrace.Propagate(err, "")
509 }
510 }
511
512 importer, err := d.tree.Import(treeVersion)
513 if err != nil {
514 return nil, stacktrace.Propagate(err, "")
515 }
516
517 pipeReader, pipeWriter := io.Pipe()
518
519 zstdReader, err := zstd.NewReader(pipeReader)
520 if err != nil {
521 return nil, stacktrace.Propagate(err, "")
522 }
523
524 chunkHashes := make([][]byte, 0, expectedNumChunks)
525 for hash := range slices.Chunk(chunksums, snapshotChunkHashSize) {
526 chunkHashes = append(chunkHashes, hash)
527 }
528
529 return &snapshotApplier{
530 tree: d.tree,
531 treeVersion: treeVersion,
532 expectedFinalHash: expectedFinalHash,
533 expectedChunkHashes: chunkHashes,
534
535 pipeWriter: pipeWriter,
536 pipeReader: pipeReader,
537 zstdReader: zstdReader.IOReadCloser(),
538
539 importer: importer,
540 compressImporter: iavl.NewCompressImporter(importer),
541 }, nil
542}
543
544func (a *snapshotApplier) Apply(chunkIndex int, chunkBytes []byte) error {
545 if len(chunkBytes) > snapshotChunkSize {
546 return stacktrace.Propagate(errMalformedChunk, "chunk too large")
547 }
548 hash := sha256.Sum256(chunkBytes)
549 if !bytes.Equal(a.expectedChunkHashes[chunkIndex], hash[:]) {
550 return stacktrace.Propagate(errMalformedChunk, "hash mismatch")
551 }
552
553 if chunkIndex == 0 {
554 if len(chunkBytes) < 80 {
555 return stacktrace.Propagate(errMalformedChunk, "chunk too small")
556 }
557
558 if string(chunkBytes[0:18]) != "didplcbft-snapshot" {
559 return stacktrace.Propagate(errMalformedChunk, "invalid file magic")
560 }
561
562 if binary.BigEndian.Uint32(chunkBytes[20:]) != 1 {
563 return stacktrace.Propagate(errMalformedChunk, "invalid snapshot format")
564 }
565
566 if binary.BigEndian.Uint64(chunkBytes[24:]) != uint64(a.treeVersion) {
567 return stacktrace.Propagate(errMalformedChunk, "mismatched tree version")
568 }
569
570 if !bytes.Equal(chunkBytes[32:64], a.expectedFinalHash) {
571 return stacktrace.Propagate(errMalformedChunk, "mismatched declared tree hash")
572 }
573
574 declaredFileSize := 80 + binary.BigEndian.Uint64(chunkBytes[64:])
575 minExpectedSize := uint64((len(a.expectedChunkHashes) - 1) * snapshotChunkSize)
576 maxExpectedSize := uint64(len(a.expectedChunkHashes) * snapshotChunkSize)
577 if declaredFileSize < minExpectedSize ||
578 declaredFileSize > maxExpectedSize {
579 return stacktrace.Propagate(errMalformedChunk, "unexpected compressed node list length")
580 }
581
582 a.claimedNodeCount = int(binary.BigEndian.Uint64(chunkBytes[72:]))
583
584 // move to the start of the compressed portion
585 chunkBytes = chunkBytes[80:]
586
587 a.importerWg.Go(a.streamingImporter)
588 }
589
590 isLastChunk := chunkIndex == len(a.expectedChunkHashes)-1
591 go func(b []byte) {
592 // From the docs:
593 // It is safe to call Read and Write in parallel with each other or with Close.
594 // Parallel calls to Read and parallel calls to Write are also safe:
595 // the individual calls will be gated sequentially.
596
597 // so even if not everything gets written from this chunk (e.g. because the zstd decoder decided not to advance)
598 // it'll eventually be written, in the correct order
599 _, _ = a.pipeWriter.Write(b)
600 if isLastChunk {
601 _ = a.pipeWriter.Close()
602 }
603 }(chunkBytes)
604
605 if isLastChunk {
606 // wait for importer to finish reading and importing everything
607 a.importerWg.Wait()
608
609 if a.numImportedNodes != a.claimedNodeCount {
610 return stacktrace.Propagate(errTreeHashMismatch, "imported node count mismatch")
611 }
612
613 err := a.importer.Commit()
614 if err != nil {
615 if strings.Contains(err.Error(), "invalid node structure") {
616 return stacktrace.Propagate(errors.Join(errMalformedChunk, err), "")
617 }
618 return stacktrace.Propagate(err, "")
619 }
620
621 a.closeCommons()
622 a.done = true
623
624 if !bytes.Equal(a.tree.Hash(), a.expectedFinalHash) {
625 return stacktrace.Propagate(errTreeHashMismatch, "")
626 }
627 }
628
629 return nil
630}
631
632func (a *snapshotApplier) streamingImporter() {
633 for {
634 nodeHeader := make([]byte, 9+4+4)
635 n, err := io.ReadFull(a.zstdReader, nodeHeader)
636 if err != nil || n != 9+4+4 {
637 // err may be EOF here, which is expected
638 return
639 }
640
641 // validate lengths against sensible limits to prevent OOM DoS by malicious third parties
642 keyLength := binary.BigEndian.Uint32(nodeHeader[9:13])
643 var key []byte
644 if keyLength != 0xffffffff {
645 if keyLength > 1024*1024 {
646 return
647 }
648 key = make([]byte, keyLength)
649
650 n, err = io.ReadFull(a.zstdReader, key)
651 if err != nil || n != len(key) {
652 // this shouldn't happen unless the data is corrupt
653 // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway
654 return
655 }
656 }
657
658 valueLength := binary.BigEndian.Uint32(nodeHeader[13:17])
659 var value []byte
660 if valueLength != 0xffffffff {
661 if valueLength > 1024*1024 {
662 return
663 }
664 value = make([]byte, valueLength)
665 n, err = io.ReadFull(a.zstdReader, value)
666 if err != nil || n != len(value) {
667 return
668 }
669 }
670
671 err = a.compressImporter.Add(&iavl.ExportNode{
672 Height: int8(nodeHeader[0]),
673 Version: int64(binary.BigEndian.Uint64(nodeHeader[1:9])),
674 Key: key,
675 Value: value,
676 })
677 if err != nil {
678 // this shouldn't happen unless the data is corrupt
679 // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway
680 return
681 }
682 a.numImportedNodes++
683 }
684}
685
686func (a *snapshotApplier) Abort() error {
687 err := a.closeCommons()
688 if err != nil {
689 return stacktrace.Propagate(err, "")
690 }
691
692 err = a.tree.DeleteVersionsFrom(0)
693 if err != nil {
694 return stacktrace.Propagate(err, "")
695 }
696
697 return nil
698}
699
700func (a *snapshotApplier) closeCommons() error {
701 err := a.zstdReader.Close()
702 if err != nil {
703 return stacktrace.Propagate(err, "")
704 }
705
706 err = a.pipeReader.Close()
707 if err != nil {
708 return stacktrace.Propagate(err, "")
709 }
710
711 err = a.pipeWriter.Close()
712 if err != nil {
713 return stacktrace.Propagate(err, "")
714 }
715
716 a.importerWg.Wait()
717 a.importer.Close()
718
719 return nil
720}
721
722func (a *snapshotApplier) Done() bool {
723 return a.done
724}