A very experimental PLC implementation which uses BFT consensus for decentralization
at main 5.3 kB view raw
1package abciapp 2 3import ( 4 "context" 5 "encoding/hex" 6 "net/url" 7 8 cbornode "github.com/ipfs/go-ipld-cbor" 9 "github.com/palantir/stacktrace" 10 "tangled.org/gbl08ma.com/didplcbft/store" 11) 12 13var TransactionActionSetAuthoritativePlc = registerTransactionAction[SetAuthoritativePlcArguments]("SetAuthoritativePlc", processSetAuthoritativePlcTx) 14 15type SetAuthoritativePlcArguments struct { 16 PLCURL string `json:"plcURL" refmt:"plcURL"` 17 RestartImport bool `json:"restartImport" refmt:"restartImport"` 18} 19 20func (SetAuthoritativePlcArguments) ForAction() TransactionAction { 21 return TransactionActionSetAuthoritativePlc 22} 23 24func init() { 25 cbornode.RegisterCborType(SetAuthoritativePlcArguments{}) 26 cbornode.RegisterCborType(Transaction[SetAuthoritativePlcArguments]{}) 27} 28 29func processSetAuthoritativePlcTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) { 30 tx, err := UnmarshalTransaction[SetAuthoritativePlcArguments](txBytes) 31 if err != nil { 32 return &processResult{ 33 Code: 4000, 34 Info: err.Error(), 35 }, nil 36 } 37 38 // TODO this transaction must somehow validate that whoever submitted it has the permission to change this 39 // Does it even make sense to keep this operation type as something submitted via the mempool in the long run, 40 // or would it be tied to some sort of proposal/participation system, where the validators submit this operation type in response to some on-chain trigger? 41 42 // A simple solution in the short term might be to just validate a "simple" public-private signature + an expiry timestamp (to prevent replay attacks) 43 // which would both be part of the SetAuthoritativePlcArguments. Very centralized, but very straightforward 44 // (the public key would be part of the config or even hardcoded for good measure) 45 46 if tx.Arguments.PLCURL != "" { 47 parsed, err := url.Parse(tx.Arguments.PLCURL) 48 if err != nil || parsed.Scheme != "https" { 49 return &processResult{ 50 Code: 4100, 51 Info: "Malformed Authoritative PLC URL", 52 }, nil 53 } 54 } 55 56 if writeTx, ok := deps.writeTx.Get(); ok { 57 err = store.Tree.SetAuthoritativePLC(writeTx, tx.Arguments.PLCURL) 58 if err != nil { 59 return nil, stacktrace.Propagate(err, "") 60 } 61 62 if tx.Arguments.RestartImport { 63 err = store.Tree.SetAuthoritativeImportProgress(writeTx, 0) 64 if err != nil { 65 return nil, stacktrace.Propagate(err, "") 66 } 67 } 68 } 69 70 return &processResult{ 71 Code: 0, 72 }, nil 73} 74 75var TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport", processAuthoritativeImportTx) 76 77type AuthoritativeImportArguments struct { 78 PLCURL string `json:"plcURL" refmt:"plcURL"` 79 Cursor uint64 `json:"cursor" refmt:"cursor"` 80 Count uint64 `json:"count" refmt:"count"` 81 Hash string `json:"hash" refmt:"hash"` 82} 83 84func (AuthoritativeImportArguments) ForAction() TransactionAction { 85 return TransactionActionAuthoritativeImport 86} 87 88func init() { 89 cbornode.RegisterCborType(AuthoritativeImportArguments{}) 90 cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{}) 91} 92 93func processAuthoritativeImportTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) { 94 tx, err := UnmarshalTransaction[AuthoritativeImportArguments](txBytes) 95 if err != nil { 96 return &processResult{ 97 Code: 4000, 98 Info: err.Error(), 99 }, nil 100 } 101 102 expectedPlcUrl, err := store.Tree.AuthoritativePLC(deps.readTx) 103 if err != nil { 104 return nil, stacktrace.Propagate(err, "") 105 } 106 107 if expectedPlcUrl != tx.Arguments.PLCURL { 108 return &processResult{ 109 Code: 4110, 110 Info: "Unexpected Authoritative PLC URL", 111 }, nil 112 } 113 114 aoc := getOrCreateAuthoritativeOperationsCache(deps.runnerContext, deps.aocsByPLC, expectedPlcUrl) 115 116 expectedCursor, err := store.Tree.AuthoritativeImportProgress(deps.readTx) 117 if err != nil { 118 return nil, stacktrace.Propagate(err, "") 119 } 120 121 if expectedCursor != tx.Arguments.Cursor { 122 return &processResult{ 123 Code: 4111, 124 Info: "Unexpected import cursor", 125 }, nil 126 } 127 128 operations, err := aoc.get(ctx, expectedCursor, tx.Arguments.Count) 129 if err != nil { 130 return &processResult{ 131 Code: 4112, 132 Info: "Failure to obtain authoritative operations", 133 }, nil 134 } 135 136 if uint64(len(operations)) < tx.Arguments.Count { 137 return &processResult{ 138 Code: 4113, 139 Info: "Unexpected import count", 140 }, nil 141 } 142 143 expectedHashBytes, err := computeLogEntriesHash(operations) 144 if err != nil { 145 return nil, stacktrace.Propagate(err, "") 146 } 147 148 if hex.EncodeToString(expectedHashBytes) != tx.Arguments.Hash { 149 return &processResult{ 150 Code: 4114, 151 Info: "Unexpected import hash", 152 }, nil 153 } 154 155 newCursor := expectedCursor 156 if len(operations) > 0 { 157 newCursor = operations[len(operations)-1].Seq 158 } 159 160 if writeTx, ok := deps.writeTx.Get(); ok { 161 for _, op := range operations { 162 err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, writeTx, op.LogEntry) 163 if err != nil { 164 return nil, stacktrace.Propagate(err, "") 165 } 166 } 167 168 err = store.Tree.SetAuthoritativeImportProgress(writeTx, newCursor) 169 if err != nil { 170 return nil, stacktrace.Propagate(err, "") 171 } 172 } 173 174 return &processResult{ 175 commitSideEffects: []func(){ 176 func() { 177 aoc.dropSeqBelowOrEqual(newCursor) 178 }, 179 }, 180 Code: 0, 181 }, nil 182}