A very experimental PLC implementation which uses BFT consensus for decentralization

Continue work on authoritative import

gbl08ma.com d984008e 50f5461a

verified
+38 -4
abciapp/execution.go
··· 8 9 abcitypes "github.com/cometbft/cometbft/abci/types" 10 "github.com/palantir/stacktrace" 11 ) 12 13 // InitChain implements [types.Application]. ··· 19 // PrepareProposal implements [types.Application]. 20 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 21 defer d.tree.Rollback() 22 23 st := time.Now() 24 acceptedTx := make([][]byte, 0, len(req.Txs)) ··· 26 for { 27 toTryNext := [][]byte{} 28 for _, tx := range toProcess { 29 - result, err := processTx(ctx, d.plc, tx, req.Time, true) 30 if err != nil { 31 return nil, stacktrace.Propagate(err, "") 32 } 33 34 if result.Code == 0 { 35 acceptedTx = append(acceptedTx, tx) 36 } else { ··· 39 toTryNext = append(toTryNext, tx) 40 } 41 } 42 - if len(toProcess) == len(toTryNext) { 43 // we made no progress in this iteration - all transactions left to process fail to do so 44 // so they can't be depending on anything that would be included in this block, at this point 45 // just continue while dropping the transactions that would never succeed in this block ··· 52 toProcess = toTryNext 53 } 54 55 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil 56 } 57 ··· 74 } 75 }() 76 77 txResults := make([]*abcitypes.ExecTxResult, len(req.Txs)) 78 for i, tx := range req.Txs { 79 - result, err := processTx(ctx, d.plc, tx, req.Time, true) 80 if err != nil { 81 return nil, stacktrace.Propagate(err, "") 82 } ··· 86 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 87 } 88 89 txResults[i] = &abcitypes.ExecTxResult{ 90 Code: result.Code, 91 Data: result.Data, ··· 130 // discard the current modified state, and process the decided block 131 d.tree.Rollback() 132 133 txResults := make([]*abcitypes.ExecTxResult, len(req.Txs)) 134 for i, tx := range req.Txs { 135 - result, err := processTx(ctx, d.plc, tx, req.Time, true) 136 if err != nil { 137 return nil, stacktrace.Propagate(err, "") 138 }
··· 8 9 abcitypes "github.com/cometbft/cometbft/abci/types" 10 "github.com/palantir/stacktrace" 11 + "github.com/samber/lo" 12 ) 13 14 // InitChain implements [types.Application]. ··· 20 // PrepareProposal implements [types.Application]. 21 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 22 defer d.tree.Rollback() 23 + 24 + deps := TransactionProcessorDependencies{ 25 + plc: d.plc, 26 + tree: d, 27 + } 28 29 st := time.Now() 30 acceptedTx := make([][]byte, 0, len(req.Txs)) ··· 32 for { 33 toTryNext := [][]byte{} 34 for _, tx := range toProcess { 35 + result, err := processTx(ctx, deps, tx, req.Time, true) 36 if err != nil { 37 return nil, stacktrace.Propagate(err, "") 38 } 39 40 + if result.IsAuthoritativeImportTransaction { 41 + // this type of transaction is not meant to appear in the mempool, 42 + // but maybe it's not impossible that a non-compliant node could have gossiped it to us? 43 + // (not sure if CometBFT checks transactions coming from other peers against CheckTx) 44 + continue 45 + } 46 + 47 if result.Code == 0 { 48 acceptedTx = append(acceptedTx, tx) 49 } else { ··· 52 toTryNext = append(toTryNext, tx) 53 } 54 } 55 + if len(toProcess) >= len(toTryNext) { 56 // we made no progress in this iteration - all transactions left to process fail to do so 57 // so they can't be depending on anything that would be included in this block, at this point 58 // just continue while dropping the transactions that would never succeed in this block ··· 65 toProcess = toTryNext 66 } 67 68 + totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 69 + if totalSize < int(req.MaxTxBytes)-4096 { 70 + // we have space to fit an import transaction 71 + // TODO 72 + } 73 + 74 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil 75 } 76 ··· 93 } 94 }() 95 96 + deps := TransactionProcessorDependencies{ 97 + plc: d.plc, 98 + tree: d, 99 + } 100 + 101 txResults := make([]*abcitypes.ExecTxResult, len(req.Txs)) 102 for i, tx := range req.Txs { 103 + result, err := processTx(ctx, deps, tx, req.Time, true) 104 if err != nil { 105 return nil, stacktrace.Propagate(err, "") 106 } ··· 110 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 111 } 112 113 + if result.IsAuthoritativeImportTransaction && i != len(req.Txs)-1 { 114 + // if an Authoritative Import transaction is present on the block, it must be the last one 115 + return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 116 + } 117 + 118 txResults[i] = &abcitypes.ExecTxResult{ 119 Code: result.Code, 120 Data: result.Data, ··· 159 // discard the current modified state, and process the decided block 160 d.tree.Rollback() 161 162 + deps := TransactionProcessorDependencies{ 163 + plc: d.plc, 164 + tree: d, 165 + } 166 + 167 txResults := make([]*abcitypes.ExecTxResult, len(req.Txs)) 168 for i, tx := range req.Txs { 169 + result, err := processTx(ctx, deps, tx, req.Time, true) 170 if err != nil { 171 return nil, stacktrace.Propagate(err, "") 172 }
+136
abciapp/import.go
···
··· 1 + package abciapp 2 + 3 + import ( 4 + "bufio" 5 + "context" 6 + "crypto/sha256" 7 + "encoding/binary" 8 + "encoding/json" 9 + "fmt" 10 + "net/http" 11 + "net/url" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/did-method-plc/go-didplc" 16 + "github.com/ipfs/go-cid" 17 + "github.com/palantir/stacktrace" 18 + "github.com/samber/lo" 19 + "tangled.org/gbl08ma/didplcbft/store" 20 + ) 21 + 22 + func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, plcURL string, startAt, maxCount uint64) ([]didplc.LogEntry, uint64, error) { 23 + baseURL, err := url.JoinPath(plcURL, "/export") 24 + if err != nil { 25 + return nil, 0, stacktrace.Propagate(err, "") 26 + } 27 + 28 + client := &http.Client{Timeout: 30 * time.Second} 29 + 30 + entries := make([]didplc.LogEntry, 0, maxCount) 31 + for { 32 + req, err := http.NewRequestWithContext(ctx, "GET", baseURL, nil) 33 + if err != nil { 34 + return nil, 0, stacktrace.Propagate(err, "") 35 + } 36 + 37 + req.Header.Set("User-Agent", "go-did-method-plc") 38 + 39 + requestCount := min(1000, maxCount-uint64(len(entries))) 40 + 41 + q := req.URL.Query() 42 + q.Add("count", fmt.Sprint(requestCount)) 43 + q.Add("after", fmt.Sprint(startAt)) 44 + req.URL.RawQuery = q.Encode() 45 + 46 + resp, err := client.Do(req) 47 + if err != nil { 48 + return nil, 0, stacktrace.Propagate(err, "") 49 + } 50 + defer resp.Body.Close() 51 + 52 + if resp.StatusCode != http.StatusOK { 53 + return nil, 0, stacktrace.NewError("non-200 status code") 54 + } 55 + 56 + type logEntryWithSeq struct { 57 + didplc.LogEntry 58 + Seq uint64 `json:"seq"` 59 + } 60 + 61 + // Read response body 62 + s := bufio.NewScanner(resp.Body) 63 + numEntriesThisResponse := 0 64 + for s.Scan() && len(entries) < int(maxCount) { 65 + var entry logEntryWithSeq 66 + if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 67 + return nil, 0, stacktrace.Propagate(err, "") 68 + } 69 + entries = append(entries, entry.LogEntry) 70 + numEntriesThisResponse++ 71 + startAt = entry.Seq 72 + } 73 + if s.Err() != nil { 74 + return nil, 0, stacktrace.Propagate(s.Err(), "") 75 + } 76 + 77 + if uint64(numEntriesThisResponse) < requestCount || len(entries) >= int(maxCount) { 78 + break 79 + } 80 + } 81 + 82 + return entries, startAt, nil 83 + } 84 + 85 + func computeLogEntriesHash(logEntries []didplc.LogEntry) ([]byte, error) { 86 + // let's _not_ rely on the specifics of the JSON representation 87 + // (instead let's rely on specifics of our implementation, heh) 88 + 89 + hash := sha256.New() 90 + for i, entry := range logEntries { 91 + // Write DID 92 + didBytes, err := store.DIDToBytes(entry.DID) 93 + if err != nil { 94 + return nil, stacktrace.Propagate(err, "invalid DID in entry index %d", i) 95 + } 96 + 97 + _, err = hash.Write(didBytes) 98 + if err != nil { 99 + return nil, stacktrace.Propagate(err, "") 100 + } 101 + 102 + // Write CID 103 + // (We trust that the authoritative source computed CIDs properly, so we use theirs rather than decoding the operation and recomputing) 104 + cid, err := cid.Decode(entry.CID) 105 + if err != nil { 106 + return nil, stacktrace.Propagate(err, "invalid CID in entry index %d", i) 107 + } 108 + 109 + _, err = hash.Write(cid.Bytes()) 110 + if err != nil { 111 + return nil, stacktrace.Propagate(err, "") 112 + } 113 + 114 + // Write CreatedAt 115 + createdAt, err := syntax.ParseDatetime(entry.CreatedAt) 116 + if err != nil { 117 + return nil, stacktrace.Propagate(err, "invalid CreatedAt in entry index %d", i) 118 + } 119 + 120 + tsBytes := make([]byte, 8) 121 + binary.BigEndian.PutUint64(tsBytes, uint64(createdAt.Time().Truncate(1*time.Millisecond).UTC().UnixNano())) 122 + 123 + _, err = hash.Write(tsBytes) 124 + if err != nil { 125 + return nil, stacktrace.Propagate(err, "") 126 + } 127 + 128 + // Write Nullified 129 + _, err = hash.Write([]byte{lo.Ternary[byte](entry.Nullified, 1, 0)}) 130 + if err != nil { 131 + return nil, stacktrace.Propagate(err, "") 132 + } 133 + } 134 + 135 + return hash.Sum(nil), nil 136 + }
+13 -1
abciapp/mempool.go
··· 10 11 // CheckTx implements [types.Application]. 12 func (d *DIDPLCApplication) CheckTx(ctx context.Context, req *abcitypes.RequestCheckTx) (*abcitypes.ResponseCheckTx, error) { 13 - result, err := processTx(ctx, d.plc, req.Tx, time.Now(), false) 14 if err != nil { 15 return nil, stacktrace.Propagate(err, "") 16 } 17 return &abcitypes.ResponseCheckTx{ 18 Code: result.Code,
··· 10 11 // CheckTx implements [types.Application]. 12 func (d *DIDPLCApplication) CheckTx(ctx context.Context, req *abcitypes.RequestCheckTx) (*abcitypes.ResponseCheckTx, error) { 13 + deps := TransactionProcessorDependencies{ 14 + plc: d.plc, 15 + tree: d, 16 + } 17 + 18 + result, err := processTx(ctx, deps, req.Tx, time.Now(), false) 19 if err != nil { 20 return nil, stacktrace.Propagate(err, "") 21 + } 22 + if result.IsAuthoritativeImportTransaction { 23 + // this type of transaction is meant to be included only by validator nodes 24 + return &abcitypes.ResponseCheckTx{ 25 + Code: 4002, 26 + Info: "AuthoritativeImport transactions can only be introduced by validator nodes", 27 + }, nil 28 } 29 return &abcitypes.ResponseCheckTx{ 30 Code: result.Code,
+19 -18
abciapp/tx.go
··· 17 18 type TransactionAction string 19 20 - var ( 21 - knownActions = map[TransactionAction]struct{}{} 22 - TransactionActionCreatePlcOp = registerTransactionAction[CreatePlcOpArguments]("CreatePlcOp") 23 - TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport") 24 - ) 25 26 - func registerTransactionAction[ArgType ArgumentType](action string) TransactionAction { 27 ta := TransactionAction(action) 28 if _, present := knownActions[ta]; present { 29 panic("action already registered") ··· 32 if argType.ForAction() != "" && argType.ForAction() != ta { 33 panic("mismatched argument types") 34 } 35 - knownActions[ta] = struct{}{} 36 return ta 37 } 38 ··· 79 } 80 81 type processResult struct { 82 Code uint32 83 Data []byte 84 Log string ··· 89 Codespace string 90 } 91 92 - func processTx(ctx context.Context, p plc.PLC, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 93 if !IsTransactionSanitized(txBytes) { 94 return &processResult{ 95 Code: 4000, ··· 119 }, nil 120 } 121 122 - var result *processResult 123 - switch TransactionAction(action) { 124 - case TransactionActionCreatePlcOp: 125 - result, err = processCreatePlcOpTx(ctx, p, txBytes, atTime, execute) 126 - case TransactionActionAuthoritativeImport: 127 - result, err = processAuthoritativeImportTx(ctx, p, txBytes, atTime, execute) 128 - default: 129 - result = &processResult{ 130 Code: 4001, 131 Info: "Unknown transaction action", 132 - } 133 - 134 } 135 return result, stacktrace.Propagate(err, "") 136 }
··· 17 18 type TransactionAction string 19 20 + type TransactionProcessorDependencies struct { 21 + plc plc.PLC 22 + tree plc.TreeProvider // TODO maybe we should move the TreeProvider definition out of the plc package then? 23 + } 24 + 25 + type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) 26 + 27 + var knownActions = map[TransactionAction]TransactionProcessor{} 28 29 + func registerTransactionAction[ArgType ArgumentType](action string, processor TransactionProcessor) TransactionAction { 30 ta := TransactionAction(action) 31 if _, present := knownActions[ta]; present { 32 panic("action already registered") ··· 35 if argType.ForAction() != "" && argType.ForAction() != ta { 36 panic("mismatched argument types") 37 } 38 + knownActions[ta] = processor 39 return ta 40 } 41 ··· 82 } 83 84 type processResult struct { 85 + IsAuthoritativeImportTransaction bool 86 + 87 Code uint32 88 Data []byte 89 Log string ··· 94 Codespace string 95 } 96 97 + func processTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 98 if !IsTransactionSanitized(txBytes) { 99 return &processResult{ 100 Code: 4000, ··· 124 }, nil 125 } 126 127 + processor, ok := knownActions[TransactionAction(action)] 128 + if !ok { 129 + return &processResult{ 130 Code: 4001, 131 Info: "Unknown transaction action", 132 + }, nil 133 } 134 + 135 + result, err := processor(ctx, deps, txBytes, atTime, execute) 136 return result, stacktrace.Propagate(err, "") 137 }
+5 -3
abciapp/tx_create_plc_op.go
··· 11 "tangled.org/gbl08ma/didplcbft/plc" 12 ) 13 14 type CreatePlcOpArguments struct { 15 DID string `json:"did" refmt:"did"` 16 Operation *didplc.OpEnum `refmt:"operation"` ··· 25 cbornode.RegisterCborType(Transaction[CreatePlcOpArguments]{}) 26 } 27 28 - func processCreatePlcOpTx(ctx context.Context, p plc.PLC, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 29 tx, err := UnmarshalTransaction[CreatePlcOpArguments](txBytes) 30 if err != nil { 31 return &processResult{ ··· 52 } 53 54 if execute { 55 - err = p.ExecuteOperation(ctx, atTime, tx.Arguments.DID, opBytes) 56 } else { 57 - err = p.ValidateOperation(ctx, plc.CommittedTreeVersion, atTime, tx.Arguments.DID, opBytes) 58 } 59 if err != nil { 60 if code, ok := plc.InvalidOperationErrorCode(err); ok {
··· 11 "tangled.org/gbl08ma/didplcbft/plc" 12 ) 13 14 + var TransactionActionCreatePlcOp = registerTransactionAction[CreatePlcOpArguments]("CreatePlcOp", processCreatePlcOpTx) 15 + 16 type CreatePlcOpArguments struct { 17 DID string `json:"did" refmt:"did"` 18 Operation *didplc.OpEnum `refmt:"operation"` ··· 27 cbornode.RegisterCborType(Transaction[CreatePlcOpArguments]{}) 28 } 29 30 + func processCreatePlcOpTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 31 tx, err := UnmarshalTransaction[CreatePlcOpArguments](txBytes) 32 if err != nil { 33 return &processResult{ ··· 54 } 55 56 if execute { 57 + err = deps.plc.ExecuteOperation(ctx, atTime, tx.Arguments.DID, opBytes) 58 } else { 59 + err = deps.plc.ValidateOperation(ctx, plc.CommittedTreeVersion, atTime, tx.Arguments.DID, opBytes) 60 } 61 if err != nil { 62 if code, ok := plc.InvalidOperationErrorCode(err); ok {
+170 -4
abciapp/tx_import.go
··· 2 3 import ( 4 "context" 5 "time" 6 7 cbornode "github.com/ipfs/go-ipld-cbor" 8 "github.com/palantir/stacktrace" 9 "tangled.org/gbl08ma/didplcbft/plc" 10 ) 11 12 type AuthoritativeImportArguments struct { 13 - Cursor string `json:"cursor" refmt:"cursor"` 14 Hash string `json:"hash" refmt:"hash"` 15 } 16 ··· 23 cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{}) 24 } 25 26 - func processAuthoritativeImportTx(ctx context.Context, p plc.PLC, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 27 - // TODO 28 - return nil, stacktrace.NewError("not implemented") 29 }
··· 2 3 import ( 4 "context" 5 + "encoding/hex" 6 + "net/url" 7 "time" 8 9 + "github.com/did-method-plc/go-didplc" 10 cbornode "github.com/ipfs/go-ipld-cbor" 11 "github.com/palantir/stacktrace" 12 "tangled.org/gbl08ma/didplcbft/plc" 13 + "tangled.org/gbl08ma/didplcbft/store" 14 ) 15 16 + var TransactionActionSetAuthoritativePlc = registerTransactionAction[SetAuthoritativePlcArguments]("SetAuthoritativePlc", processSetAuthoritativePlcTx) 17 + 18 + type SetAuthoritativePlcArguments struct { 19 + PLCURL string `json:"plcURL" refmt:"plcURL"` 20 + RestartImport bool `json:"restartImport" refmt:"restartImport"` 21 + } 22 + 23 + func (SetAuthoritativePlcArguments) ForAction() TransactionAction { 24 + return TransactionActionSetAuthoritativePlc 25 + } 26 + 27 + func init() { 28 + cbornode.RegisterCborType(SetAuthoritativePlcArguments{}) 29 + cbornode.RegisterCborType(Transaction[SetAuthoritativePlcArguments]{}) 30 + } 31 + 32 + func processSetAuthoritativePlcTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 33 + tx, err := UnmarshalTransaction[SetAuthoritativePlcArguments](txBytes) 34 + if err != nil { 35 + return &processResult{ 36 + Code: 4000, 37 + Info: err.Error(), 38 + }, nil 39 + } 40 + 41 + // TODO this transaction must somehow validate that whoever submitted it has the permission to change this 42 + // Does it even make sense to keep this operation type as something submitted via the mempool in the long run, 43 + // or would it be tied to some sort of proposal/participation system, where the validators submit this operation type in response to some on-chain trigger? 44 + 45 + // A simple solution in the short term might be to just validate a "simple" public-private signature + an expiry timestamp (to prevent replay attacks) 46 + // which would both be part of the SetAuthoritativePlcArguments. Very centralized, but very straightforward 47 + // (the public key would be part of the config or even hardcoded for good measure) 48 + 49 + if tx.Arguments.PLCURL != "" { 50 + parsed, err := url.Parse(tx.Arguments.PLCURL) 51 + if err != nil || parsed.Scheme != "https" { 52 + return &processResult{ 53 + Code: 4100, 54 + Info: "Malformed Authoritative PLC URL", 55 + }, nil 56 + } 57 + } 58 + 59 + if execute { 60 + tree, err := deps.tree.MutableTree() 61 + if err != nil { 62 + return nil, stacktrace.Propagate(err, "") 63 + } 64 + err = store.Tree.SetAuthoritativePLC(tree, tx.Arguments.PLCURL) 65 + if err != nil { 66 + return nil, stacktrace.Propagate(err, "") 67 + } 68 + 69 + if tx.Arguments.RestartImport { 70 + err = store.Tree.SetAuthoritativeImportProgress(tree, 0) 71 + if err != nil { 72 + return nil, stacktrace.Propagate(err, "") 73 + } 74 + } 75 + } 76 + 77 + return &processResult{ 78 + Code: 0, 79 + }, nil 80 + } 81 + 82 + var TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport", processAuthoritativeImportTx) 83 + 84 type AuthoritativeImportArguments struct { 85 + PLCURL string `json:"plcURL" refmt:"plcURL"` 86 + Cursor uint64 `json:"cursor" refmt:"cursor"` 87 + Count uint64 `json:"count" refmt:"count"` 88 Hash string `json:"hash" refmt:"hash"` 89 } 90 ··· 97 cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{}) 98 } 99 100 + func processAuthoritativeImportTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 101 + tx, err := UnmarshalTransaction[AuthoritativeImportArguments](txBytes) 102 + if err != nil { 103 + return &processResult{ 104 + Code: 4000, 105 + Info: err.Error(), 106 + }, nil 107 + } 108 + 109 + roTree, err := deps.tree.ImmutableTree(plc.CommittedTreeVersion) 110 + if err != nil { 111 + return nil, stacktrace.Propagate(err, "") 112 + } 113 + 114 + expectedPlcUrl, err := store.Tree.AuthoritativePLC(roTree) 115 + if err != nil { 116 + return nil, stacktrace.Propagate(err, "") 117 + } 118 + 119 + if expectedPlcUrl != tx.Arguments.PLCURL { 120 + return &processResult{ 121 + Code: 4110, 122 + Info: "Unexpected Authoritative PLC URL", 123 + }, nil 124 + } 125 + 126 + expectedCursor, err := store.Tree.AuthoritativeImportProgress(roTree) 127 + if err != nil { 128 + return nil, stacktrace.Propagate(err, "") 129 + } 130 + 131 + if expectedCursor != tx.Arguments.Cursor { 132 + return &processResult{ 133 + Code: 4111, 134 + Info: "Unexpected import cursor", 135 + }, nil 136 + } 137 + 138 + // TODO this shouldn't be happening synchronously! We should always be ahead of the next transaction! 139 + // or at the very least it should only happen once (e.g. when processing the proposal) and then we should cache until it expires or until we actually commit 140 + operations, newCursor, err := fetchExportedBatchFromAuthoritativeSource(ctx, expectedPlcUrl, expectedCursor, tx.Arguments.Count) 141 + if err != nil { 142 + // returning an actual error like this means "consensus failure". Probably not the best way to deal with this, we would rather drop the transaction if not all nodes can fetch the same thing 143 + // TODO investigate 144 + return nil, stacktrace.Propagate(err, "") 145 + } 146 + 147 + expectedHashBytes, err := computeLogEntriesHash(operations) 148 + if err != nil { 149 + return nil, stacktrace.Propagate(err, "") 150 + } 151 + 152 + if hex.EncodeToString(expectedHashBytes) != tx.Arguments.Hash { 153 + return &processResult{ 154 + Code: 4112, 155 + Info: "Unexpected import hash", 156 + }, nil 157 + } 158 + 159 + if execute { 160 + tree, err := deps.tree.MutableTree() 161 + if err != nil { 162 + return nil, stacktrace.Propagate(err, "") 163 + } 164 + 165 + var client didplc.Client 166 + for _, entry := range operations { 167 + err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 168 + // TODO Oh NOOOOOOO! This is not deterministic 169 + // (fetched at different times, the audit log might grow, therefore we'll fetch and insert more ops, and change the apphash) 170 + // we need to either limit how much audit log we return (only doable if how much was fetched for each op was part of the tx, ugh) 171 + // or (probably preferred approach) make it so that the ImportOperationFromAuthoritativeSource / ReplaceHistory function only replaces up until the CID that's being imported, and no further ops 172 + // Even then there is a problem: what if the nullified status changes between imports :dizzy_face: 173 + // (can the nullified status change for the ops that are being imported only up until CID? Need to think) 174 + e, err := client.AuditLog(ctx, entry.DID) 175 + return e, stacktrace.Propagate(err, "") 176 + }) 177 + if err != nil { 178 + return nil, stacktrace.Propagate(err, "") 179 + } 180 + } 181 + err = store.Tree.SetAuthoritativeImportProgress(tree, newCursor) 182 + if err != nil { 183 + return nil, stacktrace.Propagate(err, "") 184 + } 185 + } 186 + 187 + // TODO finish implementation 188 + // 1. if execute is true: actually import the operations 189 + // 2. if execute is true: update AuthoritativeImportProgress 190 + 191 + return &processResult{ 192 + IsAuthoritativeImportTransaction: true, 193 + Code: 0, 194 + }, stacktrace.NewError("not implemented") 195 }
+46 -5
store/tree.go
··· 29 ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 30 StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 31 ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error 32 } 33 34 var _ PLCTreeStore = (*TreeStore)(nil) ··· 39 func (t *TreeStore) AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 40 proofs := []*ics23.CommitmentProof{} 41 42 - didBytes, err := didToBytes(did) 43 if err != nil { 44 return nil, nil, stacktrace.Propagate(err, "") 45 } ··· 106 107 func (t *TreeStore) AuditLogReverseIterator(tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 108 return func(yield func(int, types.SequencedLogEntry) bool) { 109 - didBytes, err := didToBytes(did) 110 if err != nil { 111 *retErr = stacktrace.Propagate(err, "") 112 return ··· 179 } 180 181 func (t *TreeStore) StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error { 182 - didBytes, err := didToBytes(entry.DID) 183 if err != nil { 184 return stacktrace.Propagate(err, "") 185 } ··· 251 252 did := remoteHistory[0].DID 253 254 - didBytes, err := didToBytes(did) 255 if err != nil { 256 return stacktrace.Propagate(err, "") 257 } ··· 361 return seq + 1, stacktrace.Propagate(err, "") 362 } 363 364 - func didToBytes(did string) ([]byte, error) { 365 if !strings.HasPrefix(did, "did:plc:") { 366 return nil, stacktrace.NewError("invalid did:plc") 367 } ··· 523 })). 524 Complete()) 525 }
··· 29 ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 30 StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 31 ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error 32 + 33 + AuthoritativePLC(tree ReadOnlyTree) (string, error) 34 + SetAuthoritativePLC(tree *iavl.MutableTree, url string) error 35 + 36 + AuthoritativeImportProgress(tree ReadOnlyTree) (uint64, error) 37 + SetAuthoritativeImportProgress(tree *iavl.MutableTree, nextCursor uint64) error 38 } 39 40 var _ PLCTreeStore = (*TreeStore)(nil) ··· 45 func (t *TreeStore) AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 46 proofs := []*ics23.CommitmentProof{} 47 48 + didBytes, err := DIDToBytes(did) 49 if err != nil { 50 return nil, nil, stacktrace.Propagate(err, "") 51 } ··· 112 113 func (t *TreeStore) AuditLogReverseIterator(tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 114 return func(yield func(int, types.SequencedLogEntry) bool) { 115 + didBytes, err := DIDToBytes(did) 116 if err != nil { 117 *retErr = stacktrace.Propagate(err, "") 118 return ··· 185 } 186 187 func (t *TreeStore) StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error { 188 + didBytes, err := DIDToBytes(entry.DID) 189 if err != nil { 190 return stacktrace.Propagate(err, "") 191 } ··· 257 258 did := remoteHistory[0].DID 259 260 + didBytes, err := DIDToBytes(did) 261 if err != nil { 262 return stacktrace.Propagate(err, "") 263 } ··· 367 return seq + 1, stacktrace.Propagate(err, "") 368 } 369 370 + func DIDToBytes(did string) ([]byte, error) { 371 if !strings.HasPrefix(did, "did:plc:") { 372 return nil, stacktrace.NewError("invalid did:plc") 373 } ··· 529 })). 530 Complete()) 531 } 532 + 533 + func (t *TreeStore) AuthoritativePLC(tree ReadOnlyTree) (string, error) { 534 + url, err := tree.Get([]byte("aPLCURL")) 535 + if err != nil { 536 + return "", stacktrace.Propagate(err, "") 537 + } 538 + if url == nil { 539 + return "", nil 540 + } 541 + return string(url), nil 542 + } 543 + 544 + func (t *TreeStore) SetAuthoritativePLC(tree *iavl.MutableTree, url string) error { 545 + _, err := tree.Set([]byte("aPLCURL"), []byte(url)) 546 + return stacktrace.Propagate(err, "") 547 + } 548 + 549 + func (t *TreeStore) AuthoritativeImportProgress(tree ReadOnlyTree) (uint64, error) { 550 + progBytes, err := tree.Get([]byte("aImportProgress")) 551 + if err != nil { 552 + return 0, stacktrace.Propagate(err, "") 553 + } 554 + if len(progBytes) != 8 { 555 + return 0, nil 556 + } 557 + return binary.BigEndian.Uint64(progBytes), nil 558 + } 559 + 560 + func (t *TreeStore) SetAuthoritativeImportProgress(tree *iavl.MutableTree, nextCursor uint64) error { 561 + value := make([]byte, 8) 562 + binary.BigEndian.PutUint64(value, nextCursor) 563 + 564 + _, err := tree.Set([]byte("aImportProgress"), value) 565 + return stacktrace.Propagate(err, "") 566 + }