A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "sync"
9 "time"
10
11 dbm "github.com/cometbft/cometbft-db"
12 abcitypes "github.com/cometbft/cometbft/abci/types"
13 "github.com/cosmos/iavl"
14 "github.com/palantir/stacktrace"
15 "github.com/samber/lo"
16 "tangled.org/gbl08ma.com/didplcbft/dbadapter"
17 "tangled.org/gbl08ma.com/didplcbft/plc"
18 "tangled.org/gbl08ma.com/didplcbft/store"
19 "tangled.org/gbl08ma.com/didplcbft/transaction"
20)
21
22type DIDPLCApplication struct {
23 runnerContext context.Context
24 plc plc.PLC
25 txFactory *transaction.Factory
26 tree *iavl.MutableTree
27 fullyClearApplicationData func() error
28
29 ongoingRead transaction.Read
30 ongoingWrite transaction.Write
31
32 snapshotDirectory string
33 snapshotApplier *snapshotApplier
34
35 lastProcessedProposalHash []byte
36 lastProcessedProposalExecTxResults []*processResult
37
38 aocsByPLC map[string]*authoritativeOperationsCache
39}
40
41// store and plc must be able to share transaction objects
42func NewDIDPLCApplication(treeDB dbm.DB, indexDB dbm.DB, clearData func() (dbm.DB, dbm.DB), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) {
43 mkTree := func() *iavl.MutableTree {
44 return iavl.NewMutableTree(dbadapter.Adapt(treeDB), 2048, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false))
45 }
46
47 tree := mkTree()
48
49 _, err := tree.Load()
50 if err != nil {
51 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "error loading latest version of the tree from storage")
52 }
53
54 if snapshotDirectory != "" {
55 err = os.MkdirAll(snapshotDirectory, os.FileMode(0755))
56 if err != nil {
57 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "")
58 }
59 }
60
61 d := &DIDPLCApplication{
62 runnerContext: context.Background(),
63 tree: tree,
64 txFactory: transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence),
65 snapshotDirectory: snapshotDirectory,
66 aocsByPLC: make(map[string]*authoritativeOperationsCache),
67 }
68
69 d.fullyClearApplicationData = func() error {
70 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import
71 // and CometBFT only calls one ABCI method at a time
72 err := d.tree.Close()
73 if err != nil {
74 return stacktrace.Propagate(err, "")
75 }
76
77 treeDB, indexDB = clearData()
78
79 *d.tree = *mkTree()
80
81 d.txFactory = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence)
82 return nil
83 }
84
85 d.plc = plc.NewPLC()
86
87 lastSnapshotVersion := tree.Version()
88
89 var wg sync.WaitGroup
90 closeCh := make(chan struct{})
91 wg.Go(func() {
92 for {
93 select {
94 case <-closeCh:
95 return
96 case <-time.After(5 * time.Minute):
97 }
98 treeVersion := tree.Version()
99 if treeVersion > int64(lastSnapshotVersion+10000) {
100 err = d.createSnapshot(treeVersion, filepath.Join(snapshotDirectory, "snapshot.tmp"))
101 if err != nil {
102 fmt.Println("FAILED TO TAKE SNAPSHOT", stacktrace.Propagate(err, ""))
103 }
104 fmt.Println("TOOK SNAPSHOT OF VERSION", treeVersion)
105 lastSnapshotVersion = treeVersion
106 }
107 }
108
109 })
110
111 /*err = d.createSnapshot(tree.Version(), filepath.Join(snapshotDirectory, "snapshot.tmp"))
112 if err != nil {
113 return nil, nil, func() {}, stacktrace.Propagate(err, "")
114 }*/
115
116 /*
117 tree2 := iavl.NewMutableTree(dbm.NewMemDB(), 2048, false, iavl.NewNopLogger())
118 importer, err := tree2.Import(tree.Version())
119 if err != nil {
120 return nil, nil, func() {}, stacktrace.Propagate(err, "")
121 }
122 cimporter := iavl.NewCompressImporter(importer)
123
124 st = time.Now()
125 for _, node := range nodes {
126 err := cimporter.Add(&node)
127 if err != nil {
128 return nil, nil, func() {}, stacktrace.Propagate(err, "")
129 }
130 }
131 err = importer.Commit()
132 if err != nil {
133 return nil, nil, func() {}, stacktrace.Propagate(err, "")
134 }
135
136 fmt.Println("Took", time.Since(st), "to import", len(nodes), "nodes")
137 fmt.Println("Imported tree hash", hex.EncodeToString(tree2.Hash()), "and version", tree2.Version())
138 */
139
140 return d, d.txFactory, d.plc, func() {
141 closeCh <- struct{}{}
142 wg.Wait()
143 lo.Must0(tree.Close())
144 }, nil
145}
146
147var _ abcitypes.Application = (*DIDPLCApplication)(nil)
148
149func (d *DIDPLCApplication) DiscardChanges() {
150 if d.ongoingWrite != nil {
151 d.ongoingWrite.Rollback()
152 }
153 d.ongoingWrite = nil
154 d.ongoingRead = nil
155}