A very experimental PLC implementation which uses BFT consensus for decentralization
at main 4.4 kB view raw
1package abciapp 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "sync" 9 "time" 10 11 dbm "github.com/cometbft/cometbft-db" 12 abcitypes "github.com/cometbft/cometbft/abci/types" 13 "github.com/cosmos/iavl" 14 "github.com/palantir/stacktrace" 15 "github.com/samber/lo" 16 "tangled.org/gbl08ma.com/didplcbft/dbadapter" 17 "tangled.org/gbl08ma.com/didplcbft/plc" 18 "tangled.org/gbl08ma.com/didplcbft/store" 19 "tangled.org/gbl08ma.com/didplcbft/transaction" 20) 21 22type DIDPLCApplication struct { 23 runnerContext context.Context 24 plc plc.PLC 25 txFactory *transaction.Factory 26 tree *iavl.MutableTree 27 fullyClearApplicationData func() error 28 29 ongoingRead transaction.Read 30 ongoingWrite transaction.Write 31 32 snapshotDirectory string 33 snapshotApplier *snapshotApplier 34 35 lastProcessedProposalHash []byte 36 lastProcessedProposalExecTxResults []*processResult 37 38 aocsByPLC map[string]*authoritativeOperationsCache 39} 40 41// store and plc must be able to share transaction objects 42func NewDIDPLCApplication(treeDB dbm.DB, indexDB dbm.DB, clearData func() (dbm.DB, dbm.DB), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 43 mkTree := func() *iavl.MutableTree { 44 return iavl.NewMutableTree(dbadapter.Adapt(treeDB), 2048, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 45 } 46 47 tree := mkTree() 48 49 _, err := tree.Load() 50 if err != nil { 51 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "error loading latest version of the tree from storage") 52 } 53 54 if snapshotDirectory != "" { 55 err = os.MkdirAll(snapshotDirectory, os.FileMode(0755)) 56 if err != nil { 57 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "") 58 } 59 } 60 61 d := &DIDPLCApplication{ 62 runnerContext: context.Background(), 63 tree: tree, 64 txFactory: transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence), 65 snapshotDirectory: snapshotDirectory, 66 aocsByPLC: make(map[string]*authoritativeOperationsCache), 67 } 68 69 d.fullyClearApplicationData = func() error { 70 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import 71 // and CometBFT only calls one ABCI method at a time 72 err := d.tree.Close() 73 if err != nil { 74 return stacktrace.Propagate(err, "") 75 } 76 77 treeDB, indexDB = clearData() 78 79 *d.tree = *mkTree() 80 81 d.txFactory = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence) 82 return nil 83 } 84 85 d.plc = plc.NewPLC() 86 87 lastSnapshotVersion := tree.Version() 88 89 var wg sync.WaitGroup 90 closeCh := make(chan struct{}) 91 wg.Go(func() { 92 for { 93 select { 94 case <-closeCh: 95 return 96 case <-time.After(5 * time.Minute): 97 } 98 treeVersion := tree.Version() 99 if treeVersion > int64(lastSnapshotVersion+10000) { 100 err = d.createSnapshot(treeVersion, filepath.Join(snapshotDirectory, "snapshot.tmp")) 101 if err != nil { 102 fmt.Println("FAILED TO TAKE SNAPSHOT", stacktrace.Propagate(err, "")) 103 } 104 fmt.Println("TOOK SNAPSHOT OF VERSION", treeVersion) 105 lastSnapshotVersion = treeVersion 106 } 107 } 108 109 }) 110 111 /*err = d.createSnapshot(tree.Version(), filepath.Join(snapshotDirectory, "snapshot.tmp")) 112 if err != nil { 113 return nil, nil, func() {}, stacktrace.Propagate(err, "") 114 }*/ 115 116 /* 117 tree2 := iavl.NewMutableTree(dbm.NewMemDB(), 2048, false, iavl.NewNopLogger()) 118 importer, err := tree2.Import(tree.Version()) 119 if err != nil { 120 return nil, nil, func() {}, stacktrace.Propagate(err, "") 121 } 122 cimporter := iavl.NewCompressImporter(importer) 123 124 st = time.Now() 125 for _, node := range nodes { 126 err := cimporter.Add(&node) 127 if err != nil { 128 return nil, nil, func() {}, stacktrace.Propagate(err, "") 129 } 130 } 131 err = importer.Commit() 132 if err != nil { 133 return nil, nil, func() {}, stacktrace.Propagate(err, "") 134 } 135 136 fmt.Println("Took", time.Since(st), "to import", len(nodes), "nodes") 137 fmt.Println("Imported tree hash", hex.EncodeToString(tree2.Hash()), "and version", tree2.Version()) 138 */ 139 140 return d, d.txFactory, d.plc, func() { 141 closeCh <- struct{}{} 142 wg.Wait() 143 lo.Must0(tree.Close()) 144 }, nil 145} 146 147var _ abcitypes.Application = (*DIDPLCApplication)(nil) 148 149func (d *DIDPLCApplication) DiscardChanges() { 150 if d.ongoingWrite != nil { 151 d.ongoingWrite.Rollback() 152 } 153 d.ongoingWrite = nil 154 d.ongoingRead = nil 155}