A very experimental PLC implementation which uses BFT consensus for decentralization
at main 20 kB view raw
1package abciapp 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "crypto/sha256" 8 "encoding/binary" 9 "errors" 10 "fmt" 11 "io" 12 "os" 13 "path/filepath" 14 "slices" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 abcitypes "github.com/cometbft/cometbft/abci/types" 21 "github.com/cosmos/iavl" 22 "github.com/klauspost/compress/zstd" 23 "github.com/palantir/stacktrace" 24) 25 26const snapshotChunkSize = 10 * 1024 * 1024 // 10 MB 27const snapshotChunkHashSize = 32 28 29// ListSnapshots implements [types.Application]. 30func (d *DIDPLCApplication) ListSnapshots(context.Context, *abcitypes.RequestListSnapshots) (*abcitypes.ResponseListSnapshots, error) { 31 files, err := filepath.Glob(filepath.Join(d.snapshotDirectory, "*.snapshot")) 32 if err != nil { 33 return nil, stacktrace.Propagate(err, "") 34 } 35 36 snapshots := make([]*abcitypes.Snapshot, 0, len(files)) 37 for _, filename := range files { 38 s, err := readSnapshotMetadata(filename) 39 if err != nil { 40 return nil, stacktrace.Propagate(err, "") 41 } 42 43 snapshots = append(snapshots, s) 44 } 45 46 return &abcitypes.ResponseListSnapshots{ 47 Snapshots: snapshots, 48 }, nil 49} 50 51func readSnapshotMetadata(filename string) (*abcitypes.Snapshot, error) { 52 // Extract height from filename pattern: %020d.snapshot 53 base := filepath.Base(filename) 54 if !strings.HasSuffix(base, ".snapshot") { 55 return nil, stacktrace.NewError("invalid snapshot filename format: %s", filename) 56 } 57 heightStr := strings.TrimSuffix(base, ".snapshot") 58 height, err := strconv.ParseInt(heightStr, 10, 64) 59 if err != nil { 60 return nil, stacktrace.Propagate(err, "failed to parse height from filename: %s", filename) 61 } 62 63 // Open and read snapshot file header 64 f, err := os.Open(filename) 65 if err != nil { 66 return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", filename) 67 } 68 defer f.Close() 69 70 // Read file magic (18 bytes) 71 magic := make([]byte, 18) 72 _, err = io.ReadFull(f, magic) 73 if err != nil { 74 return nil, stacktrace.Propagate(err, "failed to read file magic") 75 } 76 if string(magic) != "didplcbft-snapshot" { 77 return nil, stacktrace.NewError("invalid file magic: expected 'didplcbft-snapshot', got '%s'", string(magic)) 78 } 79 80 // Read version bytes (6 bytes) 81 versionBytes := make([]byte, 6) 82 _, err = io.ReadFull(f, versionBytes) 83 if err != nil { 84 return nil, stacktrace.Propagate(err, "failed to read version bytes") 85 } 86 format := binary.BigEndian.Uint32(versionBytes[2:]) 87 88 // Read height (8 bytes, big-endian) 89 heightBytes := make([]byte, 8) 90 _, err = io.ReadFull(f, heightBytes) 91 if err != nil { 92 return nil, stacktrace.Propagate(err, "failed to read height") 93 } 94 fileHeight := int64(binary.BigEndian.Uint64(heightBytes)) 95 if fileHeight != height { 96 return nil, stacktrace.NewError("height mismatch: filename indicates %d, file header contains %d", height, fileHeight) 97 } 98 99 // Read tree hash (32 bytes) 100 hash := make([]byte, 32) 101 _, err = io.ReadFull(f, hash) 102 if err != nil { 103 return nil, stacktrace.Propagate(err, "failed to read tree hash") 104 } 105 106 // Read corresponding chunksums file 107 chunksumsFilename := strings.TrimSuffix(filename, ".snapshot") + ".chunksums" 108 chunksumsData, err := os.ReadFile(chunksumsFilename) 109 if err != nil { 110 return nil, stacktrace.Propagate(err, "failed to read chunksums file: %s", chunksumsFilename) 111 } 112 113 // Calculate number of chunks (each chunk hash is 32 bytes) 114 chunks := int64(len(chunksumsData)) / snapshotChunkHashSize 115 116 return &abcitypes.Snapshot{ 117 Height: uint64(height), 118 Format: format, 119 Chunks: uint32(chunks), 120 Hash: hash, 121 Metadata: chunksumsData, 122 }, nil 123} 124 125// LoadSnapshotChunk implements [types.Application]. 126func (d *DIDPLCApplication) LoadSnapshotChunk(_ context.Context, req *abcitypes.RequestLoadSnapshotChunk) (*abcitypes.ResponseLoadSnapshotChunk, error) { 127 if req.Format != 1 { 128 // just in case CometBFT asks us to load a chunk of a format we didn't declare to support in ListSnapshots... 129 return nil, stacktrace.NewError("unsupported snapshot format") 130 } 131 132 // Construct filename from height using the same pattern as createSnapshot 133 snapshotFilename := filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", req.Height)) 134 135 // Open the snapshot file 136 f, err := os.Open(snapshotFilename) 137 if err != nil { 138 return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", snapshotFilename) 139 } 140 defer f.Close() 141 142 // Calculate the offset for the requested chunk (start from beginning of file, including header) 143 offset := int64(req.Chunk) * snapshotChunkSize 144 _, err = f.Seek(offset, io.SeekStart) 145 if err != nil { 146 return nil, stacktrace.Propagate(err, "failed to seek to chunk offset") 147 } 148 149 // Read up to snapshotChunkSize bytes 150 chunkData := make([]byte, snapshotChunkSize) 151 n, err := f.Read(chunkData) 152 if err != nil && err != io.EOF { 153 return nil, stacktrace.Propagate(err, "failed to read chunk data") 154 } 155 156 // If we read less than snapshotChunkSize, trim the slice 157 if n < snapshotChunkSize { 158 chunkData = chunkData[:n] 159 } 160 161 return &abcitypes.ResponseLoadSnapshotChunk{ 162 Chunk: chunkData, 163 }, nil 164} 165 166// ApplySnapshotChunk implements [types.Application]. 167func (d *DIDPLCApplication) ApplySnapshotChunk(_ context.Context, req *abcitypes.RequestApplySnapshotChunk) (*abcitypes.ResponseApplySnapshotChunk, error) { 168 if d.snapshotApplier == nil { 169 return nil, stacktrace.NewError("snapshot not offered yet, can't apply chunk") 170 } 171 172 err := d.snapshotApplier.Apply(int(req.Index), req.Chunk) 173 if err != nil { 174 if errors.Is(err, errMalformedChunk) { 175 return &abcitypes.ResponseApplySnapshotChunk{ 176 Result: abcitypes.ResponseApplySnapshotChunk_RETRY, 177 RefetchChunks: []uint32{req.Index}, 178 RejectSenders: []string{req.Sender}, 179 }, nil 180 } else if errors.Is(err, errTreeHashMismatch) { 181 return &abcitypes.ResponseApplySnapshotChunk{ 182 Result: abcitypes.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, 183 RejectSenders: []string{req.Sender}, 184 }, nil 185 } 186 return nil, stacktrace.NewError("failed to apply") 187 } 188 189 if d.snapshotApplier.Done() { 190 d.snapshotApplier = nil 191 err := d.plc.RefreshTreeData() 192 if err != nil { 193 return nil, stacktrace.Propagate(err, "") 194 } 195 } 196 197 return &abcitypes.ResponseApplySnapshotChunk{ 198 Result: abcitypes.ResponseApplySnapshotChunk_ACCEPT, 199 }, nil 200} 201 202// OfferSnapshot implements [types.Application]. 203func (d *DIDPLCApplication) OfferSnapshot(_ context.Context, req *abcitypes.RequestOfferSnapshot) (*abcitypes.ResponseOfferSnapshot, error) { 204 if d.snapshotApplier != nil { 205 err := d.snapshotApplier.Abort() 206 if err != nil { 207 return nil, stacktrace.Propagate(err, "") 208 } 209 } 210 211 if req.Snapshot.Format != 1 { 212 return &abcitypes.ResponseOfferSnapshot{ 213 Result: abcitypes.ResponseOfferSnapshot_REJECT_FORMAT, 214 }, nil 215 } 216 217 var err error 218 d.snapshotApplier, err = d.beginApplyingSnapshot(int64(req.Snapshot.Height), req.AppHash, int(req.Snapshot.Chunks), req.Snapshot.Metadata) 219 if err != nil { 220 d.snapshotApplier = nil 221 if errors.Is(err, errInvalidMetadata) { 222 return &abcitypes.ResponseOfferSnapshot{ 223 Result: abcitypes.ResponseOfferSnapshot_REJECT_SENDER, 224 }, nil 225 } 226 return nil, stacktrace.Propagate(err, "") 227 } 228 229 return &abcitypes.ResponseOfferSnapshot{ 230 Result: abcitypes.ResponseOfferSnapshot_ACCEPT, 231 }, nil 232} 233 234func (d *DIDPLCApplication) createSnapshot(treeVersion int64, tempFilename string) error { 235 it, err := d.tree.GetImmutable(treeVersion) 236 if err != nil { 237 return stacktrace.Propagate(err, "") 238 } 239 240 // Delete tempFilename if it exists to ensure a fresh file is created 241 _ = os.Remove(tempFilename) 242 243 f, err := os.Create(tempFilename) 244 if err != nil { 245 return stacktrace.Propagate(err, "") 246 } 247 defer f.Close() 248 249 st := time.Now() 250 251 err = writeSnapshot(f, it) 252 if err != nil { 253 return stacktrace.Propagate(err, "") 254 } 255 256 err = f.Sync() 257 if err != nil { 258 return stacktrace.Propagate(err, "") 259 } 260 261 hf, err := os.Create(filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.chunksums", treeVersion))) 262 if err != nil { 263 return stacktrace.Propagate(err, "") 264 } 265 defer hf.Close() 266 267 err = writeChunkHashes(f, hf) 268 if err != nil { 269 return stacktrace.Propagate(err, "") 270 } 271 272 err = hf.Sync() 273 if err != nil { 274 return stacktrace.Propagate(err, "") 275 } 276 277 err = f.Close() 278 if err != nil { 279 return stacktrace.Propagate(err, "") 280 } 281 282 os.Rename(tempFilename, filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", treeVersion))) 283 284 fmt.Println("Took", time.Since(st), "to export") 285 286 return nil 287} 288 289func writeSnapshot(writerSeeker io.WriteSeeker, it *iavl.ImmutableTree) error { 290 writtenUntilReservedFields := 0 291 292 bw := bufio.NewWriter(writerSeeker) 293 294 // file magic and version 295 c, err := bw.Write([]byte("didplcbft-snapshot")) 296 if err != nil { 297 return stacktrace.Propagate(err, "") 298 } 299 writtenUntilReservedFields += c 300 301 c, err = bw.Write([]byte{0, 0, 0, 0, 0, 1}) 302 if err != nil { 303 return stacktrace.Propagate(err, "") 304 } 305 writtenUntilReservedFields += c 306 307 b := make([]byte, 8) 308 binary.BigEndian.PutUint64(b, uint64(it.Version())) 309 c, err = bw.Write(b) 310 if err != nil { 311 return stacktrace.Propagate(err, "") 312 } 313 writtenUntilReservedFields += c 314 315 c, err = bw.Write(it.Hash()) 316 if err != nil { 317 return stacktrace.Propagate(err, "") 318 } 319 writtenUntilReservedFields += c 320 321 // reserve space for writing number of bytes, number of nodes 322 // 8 bytes for node list size in bytes 323 // 8 bytes for number of nodes 324 sizeOfReservedFields := 8 + 8 325 b = make([]byte, sizeOfReservedFields) 326 _, err = bw.Write(b) 327 if err != nil { 328 return stacktrace.Propagate(err, "") 329 } 330 331 zstdw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) 332 if err != nil { 333 return stacktrace.Propagate(err, "") 334 } 335 336 numNodes, err := exportNodes(it, zstdw) 337 if err != nil { 338 return stacktrace.Propagate(err, "") 339 } 340 341 err = zstdw.Close() 342 if err != nil { 343 return stacktrace.Propagate(err, "") 344 } 345 346 err = bw.Flush() 347 if err != nil { 348 return stacktrace.Propagate(err, "") 349 } 350 351 // find total compressed node list file size 352 offset, err := writerSeeker.Seek(0, io.SeekCurrent) 353 if err != nil { 354 return stacktrace.Propagate(err, "") 355 } 356 compressedNodeListSize := offset - int64(writtenUntilReservedFields) - int64(sizeOfReservedFields) 357 358 // seek back and write empty header fields 359 360 offset, err = writerSeeker.Seek(int64(writtenUntilReservedFields), io.SeekStart) 361 if err != nil { 362 return stacktrace.Propagate(err, "") 363 } 364 if offset != int64(writtenUntilReservedFields) { 365 return stacktrace.NewError("unexpected seek result") 366 } 367 368 b = make([]byte, sizeOfReservedFields) 369 binary.BigEndian.PutUint64(b, uint64(compressedNodeListSize)) 370 binary.BigEndian.PutUint64(b[8:], uint64(numNodes)) 371 _, err = writerSeeker.Write(b) 372 if err != nil { 373 return stacktrace.Propagate(err, "") 374 } 375 376 return nil 377} 378 379func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) { 380 exporter, err := it.Export() 381 if err != nil { 382 return 0, stacktrace.Propagate(err, "") 383 } 384 defer exporter.Close() 385 cexporter := iavl.NewCompressExporter(exporter) 386 387 numNodes := int64(0) 388 for { 389 node, err := cexporter.Next() 390 if errors.Is(err, iavl.ErrorExportDone) { 391 break 392 } 393 if err != nil { 394 return 0, stacktrace.Propagate(err, "") 395 } 396 397 b := make([]byte, 9) 398 b[0] = byte(node.Height) 399 400 binary.BigEndian.PutUint64(b[1:], uint64(node.Version)) 401 _, err = w.Write(b) 402 if err != nil { 403 return 0, stacktrace.Propagate(err, "") 404 } 405 406 // nil node values are different from 0-byte values 407 b = []byte{0xff, 0xff, 0xff, 0xff} 408 if node.Key != nil { 409 binary.BigEndian.PutUint32(b, uint32(len(node.Key))) 410 } 411 _, err = w.Write(b) 412 if err != nil { 413 return 0, stacktrace.Propagate(err, "") 414 } 415 416 b = []byte{0xff, 0xff, 0xff, 0xff} 417 if node.Value != nil { 418 binary.BigEndian.PutUint32(b, uint32(len(node.Value))) 419 } 420 _, err = w.Write(b) 421 if err != nil { 422 return 0, stacktrace.Propagate(err, "") 423 } 424 425 _, err = w.Write(node.Key) 426 if err != nil { 427 return 0, stacktrace.Propagate(err, "") 428 } 429 430 _, err = w.Write(node.Value) 431 if err != nil { 432 return 0, stacktrace.Propagate(err, "") 433 } 434 numNodes++ 435 } 436 437 return numNodes, nil 438} 439 440func writeChunkHashes(snapshotFile io.ReadSeeker, w io.Writer) error { 441 bw := bufio.NewWriter(w) 442 defer bw.Flush() 443 444 _, err := snapshotFile.Seek(0, io.SeekStart) 445 if err != nil { 446 return stacktrace.Propagate(err, "") 447 } 448 449 buf := make([]byte, snapshotChunkSize) 450 for { 451 n, err := io.ReadFull(snapshotFile, buf) 452 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { 453 return stacktrace.Propagate(err, "") 454 } 455 if n == 0 { 456 break 457 } 458 459 hash := sha256.Sum256(buf[:n]) 460 c, err := w.Write(hash[:]) 461 if err != nil { 462 return stacktrace.Propagate(err, "") 463 } 464 465 if c != snapshotChunkHashSize { 466 return stacktrace.NewError("unexpected chunk hash size") 467 } 468 469 if n < snapshotChunkSize { 470 break 471 } 472 } 473 474 return nil 475} 476 477type snapshotApplier struct { 478 tree *iavl.MutableTree 479 treeVersion int64 480 expectedFinalHash []byte 481 expectedChunkHashes [][]byte 482 483 pipeWriter *io.PipeWriter 484 pipeReader *io.PipeReader 485 zstdReader io.ReadCloser 486 487 importer *iavl.Importer 488 compressImporter iavl.NodeImporter 489 importerWg sync.WaitGroup 490 491 numImportedNodes int 492 claimedNodeCount int 493 done bool 494} 495 496var errMalformedChunk = errors.New("malformed chunk") 497var errInvalidMetadata = errors.New("invalid metadata") 498var errTreeHashMismatch = errors.New("tree hash mismatch") 499 500func (d *DIDPLCApplication) beginApplyingSnapshot(treeVersion int64, expectedFinalHash []byte, expectedNumChunks int, chunksums []byte) (*snapshotApplier, error) { 501 if len(chunksums)%snapshotChunkHashSize != 0 || len(chunksums)/snapshotChunkHashSize != expectedNumChunks { 502 return nil, stacktrace.Propagate(errInvalidMetadata, "") 503 } 504 505 if !d.tree.IsEmpty() { 506 err := d.fullyClearTree() 507 if err != nil { 508 return nil, stacktrace.Propagate(err, "") 509 } 510 } 511 512 importer, err := d.tree.Import(treeVersion) 513 if err != nil { 514 return nil, stacktrace.Propagate(err, "") 515 } 516 517 pipeReader, pipeWriter := io.Pipe() 518 519 zstdReader, err := zstd.NewReader(pipeReader) 520 if err != nil { 521 return nil, stacktrace.Propagate(err, "") 522 } 523 524 chunkHashes := make([][]byte, 0, expectedNumChunks) 525 for hash := range slices.Chunk(chunksums, snapshotChunkHashSize) { 526 chunkHashes = append(chunkHashes, hash) 527 } 528 529 return &snapshotApplier{ 530 tree: d.tree, 531 treeVersion: treeVersion, 532 expectedFinalHash: expectedFinalHash, 533 expectedChunkHashes: chunkHashes, 534 535 pipeWriter: pipeWriter, 536 pipeReader: pipeReader, 537 zstdReader: zstdReader.IOReadCloser(), 538 539 importer: importer, 540 compressImporter: iavl.NewCompressImporter(importer), 541 }, nil 542} 543 544func (a *snapshotApplier) Apply(chunkIndex int, chunkBytes []byte) error { 545 if len(chunkBytes) > snapshotChunkSize { 546 return stacktrace.Propagate(errMalformedChunk, "chunk too large") 547 } 548 hash := sha256.Sum256(chunkBytes) 549 if !bytes.Equal(a.expectedChunkHashes[chunkIndex], hash[:]) { 550 return stacktrace.Propagate(errMalformedChunk, "hash mismatch") 551 } 552 553 if chunkIndex == 0 { 554 if len(chunkBytes) < 80 { 555 return stacktrace.Propagate(errMalformedChunk, "chunk too small") 556 } 557 558 if string(chunkBytes[0:18]) != "didplcbft-snapshot" { 559 return stacktrace.Propagate(errMalformedChunk, "invalid file magic") 560 } 561 562 if binary.BigEndian.Uint32(chunkBytes[20:]) != 1 { 563 return stacktrace.Propagate(errMalformedChunk, "invalid snapshot format") 564 } 565 566 if binary.BigEndian.Uint64(chunkBytes[24:]) != uint64(a.treeVersion) { 567 return stacktrace.Propagate(errMalformedChunk, "mismatched tree version") 568 } 569 570 if !bytes.Equal(chunkBytes[32:64], a.expectedFinalHash) { 571 return stacktrace.Propagate(errMalformedChunk, "mismatched declared tree hash") 572 } 573 574 declaredFileSize := 80 + binary.BigEndian.Uint64(chunkBytes[64:]) 575 minExpectedSize := uint64((len(a.expectedChunkHashes) - 1) * snapshotChunkSize) 576 maxExpectedSize := uint64(len(a.expectedChunkHashes) * snapshotChunkSize) 577 if declaredFileSize < minExpectedSize || 578 declaredFileSize > maxExpectedSize { 579 return stacktrace.Propagate(errMalformedChunk, "unexpected compressed node list length") 580 } 581 582 a.claimedNodeCount = int(binary.BigEndian.Uint64(chunkBytes[72:])) 583 584 // move to the start of the compressed portion 585 chunkBytes = chunkBytes[80:] 586 587 a.importerWg.Go(a.streamingImporter) 588 } 589 590 isLastChunk := chunkIndex == len(a.expectedChunkHashes)-1 591 go func(b []byte) { 592 // From the docs: 593 // It is safe to call Read and Write in parallel with each other or with Close. 594 // Parallel calls to Read and parallel calls to Write are also safe: 595 // the individual calls will be gated sequentially. 596 597 // so even if not everything gets written from this chunk (e.g. because the zstd decoder decided not to advance) 598 // it'll eventually be written, in the correct order 599 _, _ = a.pipeWriter.Write(b) 600 if isLastChunk { 601 _ = a.pipeWriter.Close() 602 } 603 }(chunkBytes) 604 605 if isLastChunk { 606 // wait for importer to finish reading and importing everything 607 a.importerWg.Wait() 608 609 if a.numImportedNodes != a.claimedNodeCount { 610 return stacktrace.Propagate(errTreeHashMismatch, "imported node count mismatch") 611 } 612 613 err := a.importer.Commit() 614 if err != nil { 615 if strings.Contains(err.Error(), "invalid node structure") { 616 return stacktrace.Propagate(errors.Join(errMalformedChunk, err), "") 617 } 618 return stacktrace.Propagate(err, "") 619 } 620 621 a.closeCommons() 622 a.done = true 623 624 if !bytes.Equal(a.tree.Hash(), a.expectedFinalHash) { 625 return stacktrace.Propagate(errTreeHashMismatch, "") 626 } 627 } 628 629 return nil 630} 631 632func (a *snapshotApplier) streamingImporter() { 633 for { 634 nodeHeader := make([]byte, 9+4+4) 635 n, err := io.ReadFull(a.zstdReader, nodeHeader) 636 if err != nil || n != 9+4+4 { 637 // err may be EOF here, which is expected 638 return 639 } 640 641 // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 642 keyLength := binary.BigEndian.Uint32(nodeHeader[9:13]) 643 var key []byte 644 if keyLength != 0xffffffff { 645 if keyLength > 1024*1024 { 646 return 647 } 648 key = make([]byte, keyLength) 649 650 n, err = io.ReadFull(a.zstdReader, key) 651 if err != nil || n != len(key) { 652 // this shouldn't happen unless the data is corrupt 653 // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 654 return 655 } 656 } 657 658 valueLength := binary.BigEndian.Uint32(nodeHeader[13:17]) 659 var value []byte 660 if valueLength != 0xffffffff { 661 if valueLength > 1024*1024 { 662 return 663 } 664 value = make([]byte, valueLength) 665 n, err = io.ReadFull(a.zstdReader, value) 666 if err != nil || n != len(value) { 667 return 668 } 669 } 670 671 err = a.compressImporter.Add(&iavl.ExportNode{ 672 Height: int8(nodeHeader[0]), 673 Version: int64(binary.BigEndian.Uint64(nodeHeader[1:9])), 674 Key: key, 675 Value: value, 676 }) 677 if err != nil { 678 // this shouldn't happen unless the data is corrupt 679 // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 680 return 681 } 682 a.numImportedNodes++ 683 } 684} 685 686func (a *snapshotApplier) Abort() error { 687 err := a.closeCommons() 688 if err != nil { 689 return stacktrace.Propagate(err, "") 690 } 691 692 err = a.tree.DeleteVersionsFrom(0) 693 if err != nil { 694 return stacktrace.Propagate(err, "") 695 } 696 697 return nil 698} 699 700func (a *snapshotApplier) closeCommons() error { 701 err := a.zstdReader.Close() 702 if err != nil { 703 return stacktrace.Propagate(err, "") 704 } 705 706 err = a.pipeReader.Close() 707 if err != nil { 708 return stacktrace.Propagate(err, "") 709 } 710 711 err = a.pipeWriter.Close() 712 if err != nil { 713 return stacktrace.Propagate(err, "") 714 } 715 716 a.importerWg.Wait() 717 a.importer.Close() 718 719 return nil 720} 721 722func (a *snapshotApplier) Done() bool { 723 return a.done 724}