A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "slices"
8 "strings"
9 "time"
10
11 abcitypes "github.com/cometbft/cometbft/abci/types"
12 cbornode "github.com/ipfs/go-ipld-cbor"
13 "github.com/palantir/stacktrace"
14 "github.com/samber/lo"
15)
16
17// InitChain implements [types.Application].
18func (d *DIDPLCApplication) InitChain(context.Context, *abcitypes.RequestInitChain) (*abcitypes.ResponseInitChain, error) {
19 // TODO
20 return &abcitypes.ResponseInitChain{}, nil
21}
22
23// PrepareProposal implements [types.Application].
24func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) {
25 defer d.DiscardChanges()
26
27 if req.Height == 2 {
28 tx := Transaction[SetAuthoritativePlcArguments]{
29 Action: TransactionActionSetAuthoritativePlc,
30 Arguments: SetAuthoritativePlcArguments{
31 PLCURL: "https://plc.directory",
32 RestartImport: true,
33 },
34 }
35
36 out, err := cbornode.DumpObject(tx)
37 if err != nil {
38 return nil, stacktrace.Propagate(err, "")
39 }
40
41 req.Txs = append(req.Txs, out)
42 }
43
44 st := time.Now()
45 acceptedTx := make([][]byte, 0, len(req.Txs))
46 toProcess := req.Txs
47 for {
48 toTryNext := [][]byte{}
49 for _, tx := range toProcess {
50 result, err := processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true)
51 if err != nil {
52 return nil, stacktrace.Propagate(err, "")
53 }
54
55 if result.Code == 0 {
56 acceptedTx = append(acceptedTx, tx)
57 } else {
58 // if a transaction is invalid, it _might_ be because it depends on a transaction that's further up in the list
59 // process it after all the others
60 toTryNext = append(toTryNext, tx)
61 }
62 }
63 if len(toProcess) >= len(toTryNext) {
64 // we made no progress in this iteration - all transactions left to process fail to do so
65 // so they can't be depending on anything that would be included in this block, at this point
66 // just continue while dropping the transactions that would never succeed in this block
67 break
68 }
69 if time.Since(st) > 800*time.Millisecond {
70 // this is taking too long, just continue with what's already in acceptedTx
71 break
72 }
73 toProcess = toTryNext
74 }
75
76 maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx)
77 if err != nil {
78 // TODO don't fail absolutely silently always, we should at least check what the error is
79 //return nil, stacktrace.Propagate(err, "")
80 }
81
82 if err == nil && len(maybeTx) != 0 {
83 totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) })
84 // 4 KB safety margin
85 if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 {
86 // we have space to fit the import transaction
87
88 // set execute to false to save a lot of time
89 // (we trust that running the import will succeed, so just do bare minimum checks here)
90 result, err := processTx(ctx, d.transactionProcessorDependencies(), maybeTx, req.Time, false)
91 if err != nil {
92 return nil, stacktrace.Propagate(err, "")
93 }
94 if result.Code == 0 {
95 acceptedTx = append(acceptedTx, maybeTx)
96 }
97 }
98 }
99
100 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil
101}
102
103// ProcessProposal implements [types.Application].
104func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) {
105 // do not rollback tree in this method, in case the changes can be reused in FinalizeBlock
106 if req.Height != d.tree.WorkingVersion() {
107 // our tree went out of sync, this should never happen
108 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil
109 }
110
111 // if we return early, ensure we don't use incomplete results where we haven't voted ACCEPT
112 d.lastProcessedProposalHash = nil
113 d.lastProcessedProposalExecTxResults = nil
114 defer func() {
115 if d.lastProcessedProposalHash == nil {
116 // we didn't vote ACCEPT
117 // we could rollback only eventually on FinalizeBlock, but why wait - rollback now for safety
118 d.DiscardChanges()
119 }
120 }()
121
122 txResults := make([]*processResult, len(req.Txs))
123 for i, tx := range req.Txs {
124 result, action, processor, err := beginProcessTx(tx)
125 if err != nil {
126 return nil, stacktrace.Propagate(err, "")
127 }
128 if result.Code == 0 {
129 if action == TransactionActionAuthoritativeImport && i != len(req.Txs)-1 {
130 // if an Authoritative Import transaction is present on the block, it must be the last one
131 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil
132 }
133
134 st := time.Now()
135 result, err = finishProcessTx(ctx, d.transactionProcessorDependencies(), processor, tx, req.Time, true)
136 if err != nil {
137 return nil, stacktrace.Propagate(err, "")
138 }
139 fmt.Println("FINISHPROCESSTX TOOK", time.Since(st))
140 }
141
142 // when preparing a proposal, invalid transactions should have been discarded
143 // so, if something doesn't succeed now, something has gone wrong and we should not vote in agreement of the proposal
144 if result.Code != 0 {
145 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil
146 }
147
148 txResults[i] = result
149 }
150
151 d.lastProcessedProposalHash = slices.Clone(req.Hash)
152 d.lastProcessedProposalExecTxResults = txResults
153
154 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_ACCEPT}, nil
155}
156
157// ExtendVote implements [types.Application].
158func (d *DIDPLCApplication) ExtendVote(context.Context, *abcitypes.RequestExtendVote) (*abcitypes.ResponseExtendVote, error) {
159 // TODO
160 return &abcitypes.ResponseExtendVote{}, nil
161}
162
163// VerifyVoteExtension implements [types.Application].
164func (d *DIDPLCApplication) VerifyVoteExtension(context.Context, *abcitypes.RequestVerifyVoteExtension) (*abcitypes.ResponseVerifyVoteExtension, error) {
165 // TODO
166 return &abcitypes.ResponseVerifyVoteExtension{}, nil
167}
168
169// FinalizeBlock implements [types.Application].
170func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) {
171 if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil {
172 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully
173 // reuse the uncommitted results
174 return &abcitypes.ResponseFinalizeBlock{
175 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult {
176 return result.ToABCI()
177 }),
178 AppHash: d.tree.WorkingHash(),
179 }, nil
180 }
181 // a block other than the one we processed in ProcessProposal was decided
182 // discard the current modified state, and process the decided block
183 d.DiscardChanges()
184
185 txResults := make([]*processResult, len(req.Txs))
186 for i, tx := range req.Txs {
187 var err error
188 txResults[i], err = processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true)
189 if err != nil {
190 return nil, stacktrace.Propagate(err, "")
191 }
192 }
193
194 d.lastProcessedProposalHash = slices.Clone(req.Hash)
195 d.lastProcessedProposalExecTxResults = txResults
196
197 return &abcitypes.ResponseFinalizeBlock{
198 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult {
199 return result.ToABCI()
200 }),
201 AppHash: d.tree.WorkingHash(),
202 }, nil
203}
204
205// Commit implements [types.Application].
206func (d *DIDPLCApplication) Commit(context.Context, *abcitypes.RequestCommit) (*abcitypes.ResponseCommit, error) {
207 _, newVersion, err := d.tree.SaveVersion()
208 if err != nil {
209 return nil, stacktrace.Propagate(err, "")
210 }
211
212 for _, r := range d.lastProcessedProposalExecTxResults {
213 for _, cb := range r.commitSideEffects {
214 cb()
215 }
216 }
217
218 minHeightToKeep := max(newVersion-5, 0)
219 minVerToKeep := max(minHeightToKeep-5, 0)
220 if minVerToKeep > 0 {
221 err = d.tree.DeleteVersionsTo(minVerToKeep)
222 if err != nil && !strings.Contains(err.Error(), "active readers") {
223 return nil, stacktrace.Propagate(err, "")
224 }
225 }
226
227 return &abcitypes.ResponseCommit{
228 // TODO only discard actual blockchain history based on settings
229 //RetainHeight: minHeightToKeep,
230 }, nil
231}
232
233func (d *DIDPLCApplication) transactionProcessorDependencies() TransactionProcessorDependencies {
234 return TransactionProcessorDependencies{
235 runnerContext: d.runnerContext,
236 plc: d.plc,
237 tree: d,
238 aocsByPLC: d.aocsByPLC,
239 }
240}