A very experimental PLC implementation which uses BFT consensus for decentralization

Misc cleanups

gbl08ma.com ca00ae98 fb2d74d9

verified
+3 -9
README.md
··· 26 - ...but export only works with timestamps (sequence-based export and websocket-based export is not implemented) 27 - Validation of operations (hopefully it's foolproof enough - needs revising, strengthening... and nullification might not be as simple as I made it out to be?) 28 - Snapshot-based sync for quickly bringing replicas up to date 29 30 ## What's yet to be implemented 31 32 - - Actually syncing with the authoritative source, plc.directory (I am in the middle of this, and I am not liking how many doubts I have about how to approach it) 33 - Spam/abuse prevention (typical blockchains do this with transaction fees and some lesser known ones - e.g. Nano - use proof of work, but this is not a cryptocurrency and PoW is a bit ewww) 34 - A process for nodes to become validators. Unless everyone agreed that it's best if the set of validators is centralized. I mean, it's not worse than the current state of plc.directory while still allowing people to easily have their own local synced mirror through the magic of CometBFT... 35 - Testing, testing, testing, validation, validation, validation... ··· 72 73 You need to install Go, in case the go.mod and multiple *.go files didn't make it obvious. 74 75 - To run a single node, you want to use `startfresh.sh`. To run multiple nodes, there's `startfresh-testnet.sh` but note that you'll have to go into the config.toml of every node **except one**, and add 76 - 77 - ``` 78 - [plc] 79 - laddr = "" 80 - ``` 81 - 82 - to prevent the PLC API server from coming up on those nodes (since by default it'd try listening on the same port that's already used by one of the other replicas, and fail to launch). 83 84 The PLC API server listens on `127.0.0.1:28080` by default and other fun facts you could learn from reading config.go. There are other ports that the server listens on, one that exposes the CometBFT RPC API, and also the P2P ports for communication between the nodes. 85
··· 26 - ...but export only works with timestamps (sequence-based export and websocket-based export is not implemented) 27 - Validation of operations (hopefully it's foolproof enough - needs revising, strengthening... and nullification might not be as simple as I made it out to be?) 28 - Snapshot-based sync for quickly bringing replicas up to date 29 + - An initial, not very well tested, version of the "authoritative import" that gradually brings in operations from the official plc.directory 30 31 ## What's yet to be implemented 32 33 + - Actually syncing with the authoritative source, plc.directory (in progress - fetching from the official directory is mostly implemented as indicated above; submitting blockchain happenings to the official directory is yet to be implemented) 34 - Spam/abuse prevention (typical blockchains do this with transaction fees and some lesser known ones - e.g. Nano - use proof of work, but this is not a cryptocurrency and PoW is a bit ewww) 35 - A process for nodes to become validators. Unless everyone agreed that it's best if the set of validators is centralized. I mean, it's not worse than the current state of plc.directory while still allowing people to easily have their own local synced mirror through the magic of CometBFT... 36 - Testing, testing, testing, validation, validation, validation... ··· 73 74 You need to install Go, in case the go.mod and multiple *.go files didn't make it obvious. 75 76 + To run a single node, you want to use `startfresh.sh`. To run multiple nodes, there's `startfresh-testnet.sh`. Note that using the created config files, only the first node will serve the PLC API. 77 78 The PLC API server listens on `127.0.0.1:28080` by default and other fun facts you could learn from reading config.go. There are other ports that the server listens on, one that exposes the CometBFT RPC API, and also the P2P ports for communication between the nodes. 79
+32 -27
abciapp/execution.go
··· 7 "time" 8 9 abcitypes "github.com/cometbft/cometbft/abci/types" 10 "github.com/palantir/stacktrace" 11 "github.com/samber/lo" 12 - "tangled.org/gbl08ma/didplcbft/store" 13 ) 14 15 // InitChain implements [types.Application]. ··· 22 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 23 defer d.tree.Rollback() 24 25 st := time.Now() 26 acceptedTx := make([][]byte, 0, len(req.Txs)) 27 toProcess := req.Txs ··· 33 return nil, stacktrace.Propagate(err, "") 34 } 35 36 - if result.isAuthoritativeImportTransaction { 37 - // this type of transaction is not meant to appear in the mempool, 38 - // but maybe it's not impossible that a non-compliant node could have gossiped it to us? 39 - // (not sure if CometBFT checks transactions coming from other peers against CheckTx) 40 - continue 41 - } 42 - 43 if result.Code == 0 { 44 acceptedTx = append(acceptedTx, tx) 45 } else { ··· 69 70 if err == nil && len(maybeTx) != 0 { 71 totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 72 - // 4K safety margin 73 if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 { 74 // we have space to fit the import transaction 75 ··· 94 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 95 } 96 97 - if req.Height == 1 { 98 - tree, err := d.MutableTree() 99 - if err != nil { 100 - return nil, stacktrace.Propagate(err, "") 101 - } 102 - 103 - err = store.Tree.SetAuthoritativePLC(tree, "https://plc.directory") 104 - if err != nil { 105 - return nil, stacktrace.Propagate(err, "") 106 - } 107 - } 108 - 109 // if we return early, ensure we don't use incomplete results where we haven't voted ACCEPT 110 d.lastProcessedProposalHash = nil 111 d.lastProcessedProposalExecTxResults = nil ··· 119 120 txResults := make([]*processResult, len(req.Txs)) 121 for i, tx := range req.Txs { 122 - result, err := processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true) 123 if err != nil { 124 return nil, stacktrace.Propagate(err, "") 125 } 126 // when preparing a proposal, invalid transactions should have been discarded 127 // so, if something doesn't succeed now, something has gone wrong and we should not vote in agreement of the proposal 128 if result.Code != 0 { 129 - return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 130 - } 131 - 132 - if result.isAuthoritativeImportTransaction && i != len(req.Txs)-1 { 133 - // if an Authoritative Import transaction is present on the block, it must be the last one 134 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 135 } 136
··· 7 "time" 8 9 abcitypes "github.com/cometbft/cometbft/abci/types" 10 + cbornode "github.com/ipfs/go-ipld-cbor" 11 "github.com/palantir/stacktrace" 12 "github.com/samber/lo" 13 ) 14 15 // InitChain implements [types.Application]. ··· 22 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 23 defer d.tree.Rollback() 24 25 + if req.Height == 2 { 26 + tx := Transaction[SetAuthoritativePlcArguments]{ 27 + Action: TransactionActionSetAuthoritativePlc, 28 + Arguments: SetAuthoritativePlcArguments{ 29 + PLCURL: "https://plc.directory", 30 + RestartImport: true, 31 + }, 32 + } 33 + 34 + out, err := cbornode.DumpObject(tx) 35 + if err != nil { 36 + return nil, stacktrace.Propagate(err, "") 37 + } 38 + 39 + req.Txs = append(req.Txs, out) 40 + } 41 + 42 st := time.Now() 43 acceptedTx := make([][]byte, 0, len(req.Txs)) 44 toProcess := req.Txs ··· 50 return nil, stacktrace.Propagate(err, "") 51 } 52 53 if result.Code == 0 { 54 acceptedTx = append(acceptedTx, tx) 55 } else { ··· 79 80 if err == nil && len(maybeTx) != 0 { 81 totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 82 + // 4 KB safety margin 83 if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 { 84 // we have space to fit the import transaction 85 ··· 104 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 105 } 106 107 // if we return early, ensure we don't use incomplete results where we haven't voted ACCEPT 108 d.lastProcessedProposalHash = nil 109 d.lastProcessedProposalExecTxResults = nil ··· 117 118 txResults := make([]*processResult, len(req.Txs)) 119 for i, tx := range req.Txs { 120 + result, action, processor, err := beginProcessTx(tx) 121 if err != nil { 122 return nil, stacktrace.Propagate(err, "") 123 } 124 + if result.Code == 0 { 125 + if action == TransactionActionAuthoritativeImport && i != len(req.Txs)-1 { 126 + // if an Authoritative Import transaction is present on the block, it must be the last one 127 + return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 128 + } 129 + 130 + result, err = finishProcessTx(ctx, d.transactionProcessorDependencies(), processor, tx, req.Time, true) 131 + if err != nil { 132 + return nil, stacktrace.Propagate(err, "") 133 + } 134 + } 135 + 136 // when preparing a proposal, invalid transactions should have been discarded 137 // so, if something doesn't succeed now, something has gone wrong and we should not vote in agreement of the proposal 138 if result.Code != 0 { 139 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 140 } 141
+15 -7
abciapp/mempool.go
··· 10 11 // CheckTx implements [types.Application]. 12 func (d *DIDPLCApplication) CheckTx(ctx context.Context, req *abcitypes.RequestCheckTx) (*abcitypes.ResponseCheckTx, error) { 13 - result, err := processTx(ctx, d.transactionProcessorDependencies(), req.Tx, time.Now(), false) 14 if err != nil { 15 return nil, stacktrace.Propagate(err, "") 16 } 17 - if result.isAuthoritativeImportTransaction { 18 - // this type of transaction is meant to be included only by validator nodes 19 - return &abcitypes.ResponseCheckTx{ 20 - Code: 4002, 21 - Info: "AuthoritativeImport transactions can only be introduced by validator nodes", 22 - }, nil 23 } 24 return &abcitypes.ResponseCheckTx{ 25 Code: result.Code, 26 Data: result.Data,
··· 10 11 // CheckTx implements [types.Application]. 12 func (d *DIDPLCApplication) CheckTx(ctx context.Context, req *abcitypes.RequestCheckTx) (*abcitypes.ResponseCheckTx, error) { 13 + result, action, processor, err := beginProcessTx(req.Tx) 14 if err != nil { 15 return nil, stacktrace.Propagate(err, "") 16 } 17 + if result.Code == 0 { 18 + if action == TransactionActionAuthoritativeImport { 19 + // this type of transaction is meant to be included only by validator nodes 20 + return &abcitypes.ResponseCheckTx{ 21 + Code: 4002, 22 + Info: "AuthoritativeImport transactions can only be introduced by validator nodes", 23 + }, nil 24 + } 25 + 26 + result, err = finishProcessTx(ctx, d.transactionProcessorDependencies(), processor, req.Tx, time.Now(), false) 27 + if err != nil { 28 + return nil, stacktrace.Propagate(err, "") 29 + } 30 } 31 + 32 return &abcitypes.ResponseCheckTx{ 33 Code: result.Code, 34 Data: result.Data,
+28 -10
abciapp/tx.go
··· 83 } 84 85 type processResult struct { 86 - isAuthoritativeImportTransaction bool 87 - commitSideEffects []func() 88 89 Code uint32 90 Data []byte ··· 109 } 110 } 111 112 - func processTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 113 if !IsTransactionSanitized(txBytes) { 114 return &processResult{ 115 Code: 4000, 116 Info: "Transaction bytes do not follow canonical serialization format", 117 - }, nil 118 } 119 var v map[string]interface{} 120 err := cbornode.DecodeInto(txBytes, &v) ··· 122 return &processResult{ 123 Code: 4001, 124 Info: "Invalid transaction", 125 - }, nil 126 } 127 actionInterface, ok := v["action"] 128 if !ok { 129 return &processResult{ 130 Code: 4001, 131 Info: "Unknown transaction action", 132 - }, nil 133 } 134 - action, ok := actionInterface.(string) 135 if !ok { 136 return &processResult{ 137 Code: 4001, 138 Info: "Unknown transaction action", 139 - }, nil 140 } 141 142 - processor, ok := knownActions[TransactionAction(action)] 143 if !ok { 144 return &processResult{ 145 Code: 4001, 146 Info: "Unknown transaction action", 147 - }, nil 148 } 149 150 result, err := processor(ctx, deps, txBytes, atTime, execute) 151 return result, stacktrace.Propagate(err, "") 152 }
··· 83 } 84 85 type processResult struct { 86 + commitSideEffects []func() 87 88 Code uint32 89 Data []byte ··· 108 } 109 } 110 111 + func beginProcessTx(txBytes []byte) (*processResult, TransactionAction, TransactionProcessor, error) { 112 if !IsTransactionSanitized(txBytes) { 113 return &processResult{ 114 Code: 4000, 115 Info: "Transaction bytes do not follow canonical serialization format", 116 + }, "", nil, nil 117 } 118 var v map[string]interface{} 119 err := cbornode.DecodeInto(txBytes, &v) ··· 121 return &processResult{ 122 Code: 4001, 123 Info: "Invalid transaction", 124 + }, "", nil, nil 125 } 126 actionInterface, ok := v["action"] 127 if !ok { 128 return &processResult{ 129 Code: 4001, 130 Info: "Unknown transaction action", 131 + }, "", nil, nil 132 } 133 + actionString, ok := actionInterface.(string) 134 if !ok { 135 return &processResult{ 136 Code: 4001, 137 Info: "Unknown transaction action", 138 + }, "", nil, nil 139 } 140 141 + action := TransactionAction(actionString) 142 + 143 + processor, ok := knownActions[action] 144 if !ok { 145 return &processResult{ 146 Code: 4001, 147 Info: "Unknown transaction action", 148 + }, "", nil, nil 149 } 150 151 + return &processResult{}, action, processor, nil 152 + } 153 + 154 + func finishProcessTx(ctx context.Context, deps TransactionProcessorDependencies, processor TransactionProcessor, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 155 result, err := processor(ctx, deps, txBytes, atTime, execute) 156 return result, stacktrace.Propagate(err, "") 157 } 158 + 159 + func processTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 160 + result, _, processor, err := beginProcessTx(txBytes) 161 + if err != nil { 162 + return nil, stacktrace.Propagate(err, "") 163 + } 164 + if result.Code != 0 { 165 + return result, nil 166 + } 167 + 168 + result, err = finishProcessTx(ctx, deps, processor, txBytes, atTime, execute) 169 + return result, stacktrace.Propagate(err, "") 170 + }
-1
abciapp/tx_import.go
··· 187 } 188 189 return &processResult{ 190 - isAuthoritativeImportTransaction: true, 191 commitSideEffects: []func(){ 192 func() { 193 aoc.dropSeqBelowOrEqual(newCursor)
··· 187 } 188 189 return &processResult{ 190 commitSideEffects: []func(){ 191 func() { 192 aoc.dropSeqBelowOrEqual(newCursor)
+4
startfresh-testnet.sh
··· 47 48 # Adjust P2P listen address 49 sed -i "s|^laddr = \"tcp://0.0.0.0:26656\"\$|laddr = \"tcp://$p2p_ip:26656\"|g" "testnet/node$i/config/config.toml" 50 done 51 52 # Configure rpc_servers for the last node (the one that will be started manually)
··· 47 48 # Adjust P2P listen address 49 sed -i "s|^laddr = \"tcp://0.0.0.0:26656\"\$|laddr = \"tcp://$p2p_ip:26656\"|g" "testnet/node$i/config/config.toml" 50 + 51 + if [ "$i" -ne 0 ]; then 52 + echo -e "\n[plc]\nladdr = \"\"" >> "testnet/node$i/config/config.toml" 53 + fi 54 done 55 56 # Configure rpc_servers for the last node (the one that will be started manually)