A very experimental PLC implementation which uses BFT consensus for decentralization
at main 8.3 kB view raw
1package abciapp 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "slices" 8 "strings" 9 "time" 10 11 abcitypes "github.com/cometbft/cometbft/abci/types" 12 cbornode "github.com/ipfs/go-ipld-cbor" 13 "github.com/palantir/stacktrace" 14 "github.com/samber/lo" 15) 16 17// InitChain implements [types.Application]. 18func (d *DIDPLCApplication) InitChain(context.Context, *abcitypes.RequestInitChain) (*abcitypes.ResponseInitChain, error) { 19 // TODO 20 return &abcitypes.ResponseInitChain{}, nil 21} 22 23// PrepareProposal implements [types.Application]. 24func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 25 defer d.DiscardChanges() 26 27 if req.Height == 2 { 28 tx := Transaction[SetAuthoritativePlcArguments]{ 29 Action: TransactionActionSetAuthoritativePlc, 30 Arguments: SetAuthoritativePlcArguments{ 31 PLCURL: "https://plc.directory", 32 RestartImport: true, 33 }, 34 } 35 36 out, err := cbornode.DumpObject(tx) 37 if err != nil { 38 return nil, stacktrace.Propagate(err, "") 39 } 40 41 req.Txs = append(req.Txs, out) 42 } 43 44 st := time.Now() 45 acceptedTx := make([][]byte, 0, len(req.Txs)) 46 toProcess := req.Txs 47 for { 48 toTryNext := [][]byte{} 49 for _, tx := range toProcess { 50 result, err := processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true) 51 if err != nil { 52 return nil, stacktrace.Propagate(err, "") 53 } 54 55 if result.Code == 0 { 56 acceptedTx = append(acceptedTx, tx) 57 } else { 58 // if a transaction is invalid, it _might_ be because it depends on a transaction that's further up in the list 59 // process it after all the others 60 toTryNext = append(toTryNext, tx) 61 } 62 } 63 if len(toProcess) >= len(toTryNext) { 64 // we made no progress in this iteration - all transactions left to process fail to do so 65 // so they can't be depending on anything that would be included in this block, at this point 66 // just continue while dropping the transactions that would never succeed in this block 67 break 68 } 69 if time.Since(st) > 800*time.Millisecond { 70 // this is taking too long, just continue with what's already in acceptedTx 71 break 72 } 73 toProcess = toTryNext 74 } 75 76 maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx) 77 if err != nil { 78 // TODO don't fail absolutely silently always, we should at least check what the error is 79 //return nil, stacktrace.Propagate(err, "") 80 } 81 82 if err == nil && len(maybeTx) != 0 { 83 totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 84 // 4 KB safety margin 85 if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 { 86 // we have space to fit the import transaction 87 88 // set execute to false to save a lot of time 89 // (we trust that running the import will succeed, so just do bare minimum checks here) 90 result, err := processTx(ctx, d.transactionProcessorDependencies(), maybeTx, req.Time, false) 91 if err != nil { 92 return nil, stacktrace.Propagate(err, "") 93 } 94 if result.Code == 0 { 95 acceptedTx = append(acceptedTx, maybeTx) 96 } 97 } 98 } 99 100 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil 101} 102 103// ProcessProposal implements [types.Application]. 104func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) { 105 // do not rollback tree in this method, in case the changes can be reused in FinalizeBlock 106 if req.Height != d.tree.WorkingVersion() { 107 // our tree went out of sync, this should never happen 108 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 109 } 110 111 // if we return early, ensure we don't use incomplete results where we haven't voted ACCEPT 112 d.lastProcessedProposalHash = nil 113 d.lastProcessedProposalExecTxResults = nil 114 defer func() { 115 if d.lastProcessedProposalHash == nil { 116 // we didn't vote ACCEPT 117 // we could rollback only eventually on FinalizeBlock, but why wait - rollback now for safety 118 d.DiscardChanges() 119 } 120 }() 121 122 txResults := make([]*processResult, len(req.Txs)) 123 for i, tx := range req.Txs { 124 result, action, processor, err := beginProcessTx(tx) 125 if err != nil { 126 return nil, stacktrace.Propagate(err, "") 127 } 128 if result.Code == 0 { 129 if action == TransactionActionAuthoritativeImport && i != len(req.Txs)-1 { 130 // if an Authoritative Import transaction is present on the block, it must be the last one 131 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 132 } 133 134 st := time.Now() 135 result, err = finishProcessTx(ctx, d.transactionProcessorDependencies(), processor, tx, req.Time, true) 136 if err != nil { 137 return nil, stacktrace.Propagate(err, "") 138 } 139 fmt.Println("FINISHPROCESSTX TOOK", time.Since(st)) 140 } 141 142 // when preparing a proposal, invalid transactions should have been discarded 143 // so, if something doesn't succeed now, something has gone wrong and we should not vote in agreement of the proposal 144 if result.Code != 0 { 145 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 146 } 147 148 txResults[i] = result 149 } 150 151 d.lastProcessedProposalHash = slices.Clone(req.Hash) 152 d.lastProcessedProposalExecTxResults = txResults 153 154 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_ACCEPT}, nil 155} 156 157// ExtendVote implements [types.Application]. 158func (d *DIDPLCApplication) ExtendVote(context.Context, *abcitypes.RequestExtendVote) (*abcitypes.ResponseExtendVote, error) { 159 // TODO 160 return &abcitypes.ResponseExtendVote{}, nil 161} 162 163// VerifyVoteExtension implements [types.Application]. 164func (d *DIDPLCApplication) VerifyVoteExtension(context.Context, *abcitypes.RequestVerifyVoteExtension) (*abcitypes.ResponseVerifyVoteExtension, error) { 165 // TODO 166 return &abcitypes.ResponseVerifyVoteExtension{}, nil 167} 168 169// FinalizeBlock implements [types.Application]. 170func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) { 171 if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil { 172 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 173 // reuse the uncommitted results 174 return &abcitypes.ResponseFinalizeBlock{ 175 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 176 return result.ToABCI() 177 }), 178 AppHash: d.tree.WorkingHash(), 179 }, nil 180 } 181 // a block other than the one we processed in ProcessProposal was decided 182 // discard the current modified state, and process the decided block 183 d.DiscardChanges() 184 185 txResults := make([]*processResult, len(req.Txs)) 186 for i, tx := range req.Txs { 187 var err error 188 txResults[i], err = processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true) 189 if err != nil { 190 return nil, stacktrace.Propagate(err, "") 191 } 192 } 193 194 d.lastProcessedProposalHash = slices.Clone(req.Hash) 195 d.lastProcessedProposalExecTxResults = txResults 196 197 return &abcitypes.ResponseFinalizeBlock{ 198 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 199 return result.ToABCI() 200 }), 201 AppHash: d.tree.WorkingHash(), 202 }, nil 203} 204 205// Commit implements [types.Application]. 206func (d *DIDPLCApplication) Commit(context.Context, *abcitypes.RequestCommit) (*abcitypes.ResponseCommit, error) { 207 _, newVersion, err := d.tree.SaveVersion() 208 if err != nil { 209 return nil, stacktrace.Propagate(err, "") 210 } 211 212 for _, r := range d.lastProcessedProposalExecTxResults { 213 for _, cb := range r.commitSideEffects { 214 cb() 215 } 216 } 217 218 minHeightToKeep := max(newVersion-5, 0) 219 minVerToKeep := max(minHeightToKeep-5, 0) 220 if minVerToKeep > 0 { 221 err = d.tree.DeleteVersionsTo(minVerToKeep) 222 if err != nil && !strings.Contains(err.Error(), "active readers") { 223 return nil, stacktrace.Propagate(err, "") 224 } 225 } 226 227 return &abcitypes.ResponseCommit{ 228 // TODO only discard actual blockchain history based on settings 229 //RetainHeight: minHeightToKeep, 230 }, nil 231} 232 233func (d *DIDPLCApplication) transactionProcessorDependencies() TransactionProcessorDependencies { 234 return TransactionProcessorDependencies{ 235 runnerContext: d.runnerContext, 236 plc: d.plc, 237 tree: d, 238 aocsByPLC: d.aocsByPLC, 239 } 240}