A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "context"
5 "encoding/hex"
6 "net/url"
7
8 cbornode "github.com/ipfs/go-ipld-cbor"
9 "github.com/palantir/stacktrace"
10 "tangled.org/gbl08ma.com/didplcbft/store"
11)
12
13var TransactionActionSetAuthoritativePlc = registerTransactionAction[SetAuthoritativePlcArguments]("SetAuthoritativePlc", processSetAuthoritativePlcTx)
14
15type SetAuthoritativePlcArguments struct {
16 PLCURL string `json:"plcURL" refmt:"plcURL"`
17 RestartImport bool `json:"restartImport" refmt:"restartImport"`
18}
19
20func (SetAuthoritativePlcArguments) ForAction() TransactionAction {
21 return TransactionActionSetAuthoritativePlc
22}
23
24func init() {
25 cbornode.RegisterCborType(SetAuthoritativePlcArguments{})
26 cbornode.RegisterCborType(Transaction[SetAuthoritativePlcArguments]{})
27}
28
29func processSetAuthoritativePlcTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) {
30 tx, err := UnmarshalTransaction[SetAuthoritativePlcArguments](txBytes)
31 if err != nil {
32 return &processResult{
33 Code: 4000,
34 Info: err.Error(),
35 }, nil
36 }
37
38 // TODO this transaction must somehow validate that whoever submitted it has the permission to change this
39 // Does it even make sense to keep this operation type as something submitted via the mempool in the long run,
40 // or would it be tied to some sort of proposal/participation system, where the validators submit this operation type in response to some on-chain trigger?
41
42 // A simple solution in the short term might be to just validate a "simple" public-private signature + an expiry timestamp (to prevent replay attacks)
43 // which would both be part of the SetAuthoritativePlcArguments. Very centralized, but very straightforward
44 // (the public key would be part of the config or even hardcoded for good measure)
45
46 if tx.Arguments.PLCURL != "" {
47 parsed, err := url.Parse(tx.Arguments.PLCURL)
48 if err != nil || parsed.Scheme != "https" {
49 return &processResult{
50 Code: 4100,
51 Info: "Malformed Authoritative PLC URL",
52 }, nil
53 }
54 }
55
56 if writeTx, ok := deps.writeTx.Get(); ok {
57 err = store.Tree.SetAuthoritativePLC(writeTx, tx.Arguments.PLCURL)
58 if err != nil {
59 return nil, stacktrace.Propagate(err, "")
60 }
61
62 if tx.Arguments.RestartImport {
63 err = store.Tree.SetAuthoritativeImportProgress(writeTx, 0)
64 if err != nil {
65 return nil, stacktrace.Propagate(err, "")
66 }
67 }
68 }
69
70 return &processResult{
71 Code: 0,
72 }, nil
73}
74
75var TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport", processAuthoritativeImportTx)
76
77type AuthoritativeImportArguments struct {
78 PLCURL string `json:"plcURL" refmt:"plcURL"`
79 Cursor uint64 `json:"cursor" refmt:"cursor"`
80 Count uint64 `json:"count" refmt:"count"`
81 Hash string `json:"hash" refmt:"hash"`
82}
83
84func (AuthoritativeImportArguments) ForAction() TransactionAction {
85 return TransactionActionAuthoritativeImport
86}
87
88func init() {
89 cbornode.RegisterCborType(AuthoritativeImportArguments{})
90 cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{})
91}
92
93func processAuthoritativeImportTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) {
94 tx, err := UnmarshalTransaction[AuthoritativeImportArguments](txBytes)
95 if err != nil {
96 return &processResult{
97 Code: 4000,
98 Info: err.Error(),
99 }, nil
100 }
101
102 expectedPlcUrl, err := store.Tree.AuthoritativePLC(deps.readTx)
103 if err != nil {
104 return nil, stacktrace.Propagate(err, "")
105 }
106
107 if expectedPlcUrl != tx.Arguments.PLCURL {
108 return &processResult{
109 Code: 4110,
110 Info: "Unexpected Authoritative PLC URL",
111 }, nil
112 }
113
114 aoc := getOrCreateAuthoritativeOperationsCache(deps.runnerContext, deps.aocsByPLC, expectedPlcUrl)
115
116 expectedCursor, err := store.Tree.AuthoritativeImportProgress(deps.readTx)
117 if err != nil {
118 return nil, stacktrace.Propagate(err, "")
119 }
120
121 if expectedCursor != tx.Arguments.Cursor {
122 return &processResult{
123 Code: 4111,
124 Info: "Unexpected import cursor",
125 }, nil
126 }
127
128 operations, err := aoc.get(ctx, expectedCursor, tx.Arguments.Count)
129 if err != nil {
130 return &processResult{
131 Code: 4112,
132 Info: "Failure to obtain authoritative operations",
133 }, nil
134 }
135
136 if uint64(len(operations)) < tx.Arguments.Count {
137 return &processResult{
138 Code: 4113,
139 Info: "Unexpected import count",
140 }, nil
141 }
142
143 expectedHashBytes, err := computeLogEntriesHash(operations)
144 if err != nil {
145 return nil, stacktrace.Propagate(err, "")
146 }
147
148 if hex.EncodeToString(expectedHashBytes) != tx.Arguments.Hash {
149 return &processResult{
150 Code: 4114,
151 Info: "Unexpected import hash",
152 }, nil
153 }
154
155 newCursor := expectedCursor
156 if len(operations) > 0 {
157 newCursor = operations[len(operations)-1].Seq
158 }
159
160 if writeTx, ok := deps.writeTx.Get(); ok {
161 for _, op := range operations {
162 err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, writeTx, op.LogEntry)
163 if err != nil {
164 return nil, stacktrace.Propagate(err, "")
165 }
166 }
167
168 err = store.Tree.SetAuthoritativeImportProgress(writeTx, newCursor)
169 if err != nil {
170 return nil, stacktrace.Propagate(err, "")
171 }
172 }
173
174 return &processResult{
175 commitSideEffects: []func(){
176 func() {
177 aoc.dropSeqBelowOrEqual(newCursor)
178 },
179 },
180 Code: 0,
181 }, nil
182}