A very experimental PLC implementation which uses BFT consensus for decentralization

Performance improvements

- Eager async fetch of entries to import
- Drop old tree history (improves situation but it's not a permanent solution, more investigation required) with async iavl tree pruning
- Skip executing import on proposal preparation
- Effectively enforce timeouts through proper context propagation and checking
- Iterate mutable tree with correct iterator when obtaining latest sequence value
- Avoid repeated lock acquisition when importing multiple operations

gbl08ma.com 096b0f2c f36f0997

verified
+4 -1
abciapp/app.go
··· 1 1 package abciapp 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 "os" 6 7 "path/filepath" ··· 18 19 ) 19 20 20 21 type DIDPLCApplication struct { 22 + runnerContext context.Context 21 23 plc plc.PLC 22 24 tree *iavl.MutableTree 23 25 fullyClearTree func() error ··· 35 37 func NewDIDPLCApplication(badgerDB *badger.DB, snapshotDirectory string) (*DIDPLCApplication, plc.PLC, func(), error) { 36 38 treePrefix := []byte{} 37 39 mkTree := func() *iavl.MutableTree { 38 - return iavl.NewMutableTree(badgeradapter.AdaptBadger(badgerDB, treePrefix), 2048, false, iavl.NewNopLogger()) 40 + return iavl.NewMutableTree(badgeradapter.AdaptBadger(badgerDB, treePrefix), 2048, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(true)) 39 41 } 40 42 41 43 tree := mkTree() ··· 51 53 } 52 54 53 55 d := &DIDPLCApplication{ 56 + runnerContext: context.Background(), 54 57 tree: tree, 55 58 snapshotDirectory: snapshotDirectory, 56 59 aocsByPLC: make(map[string]*authoritativeOperationsCache),
+24 -7
abciapp/execution.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "fmt" 6 7 "slices" 7 8 "time" 8 9 ··· 83 84 if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 { 84 85 // we have space to fit the import transaction 85 86 86 - result, err := processTx(ctx, d.transactionProcessorDependencies(), maybeTx, req.Time, true) 87 + // set execute to false to save a lot of time 88 + // (we trust that running the import will succeed, so just do bare minimum checks here) 89 + result, err := processTx(ctx, d.transactionProcessorDependencies(), maybeTx, req.Time, false) 87 90 if err != nil { 88 91 return nil, stacktrace.Propagate(err, "") 89 92 } ··· 127 130 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 128 131 } 129 132 133 + st := time.Now() 130 134 result, err = finishProcessTx(ctx, d.transactionProcessorDependencies(), processor, tx, req.Time, true) 131 135 if err != nil { 132 136 return nil, stacktrace.Propagate(err, "") 133 137 } 138 + fmt.Println("FINISHPROCESSTX TOOK", time.Since(st)) 134 139 } 135 140 136 141 // when preparing a proposal, invalid transactions should have been discarded ··· 198 203 199 204 // Commit implements [types.Application]. 200 205 func (d *DIDPLCApplication) Commit(context.Context, *abcitypes.RequestCommit) (*abcitypes.ResponseCommit, error) { 201 - _, _, err := d.tree.SaveVersion() 206 + _, newVersion, err := d.tree.SaveVersion() 202 207 if err != nil { 203 208 return nil, stacktrace.Propagate(err, "") 204 209 } ··· 209 214 } 210 215 } 211 216 212 - // TODO(later) consider whether we can set some RetainHeight in the response 213 - return &abcitypes.ResponseCommit{}, nil 217 + minHeightToKeep := max(newVersion-100, 0) 218 + minVerToKeep := max(minHeightToKeep-5, 0) 219 + if minVerToKeep > 0 { 220 + err = d.tree.DeleteVersionsTo(minVerToKeep) 221 + if err != nil { 222 + return nil, stacktrace.Propagate(err, "") 223 + } 224 + } 225 + 226 + return &abcitypes.ResponseCommit{ 227 + // TODO only discard actual blockchain history based on settings 228 + //RetainHeight: minHeightToKeep, 229 + }, nil 214 230 } 215 231 216 232 func (d *DIDPLCApplication) transactionProcessorDependencies() TransactionProcessorDependencies { 217 233 return TransactionProcessorDependencies{ 218 - plc: d.plc, 219 - tree: d, 220 - aocsByPLC: d.aocsByPLC, 234 + runnerContext: d.runnerContext, 235 + plc: d.plc, 236 + tree: d, 237 + aocsByPLC: d.aocsByPLC, 221 238 } 222 239 }
+39 -8
abciapp/import.go
··· 22 22 "tangled.org/gbl08ma.com/didplcbft/store" 23 23 ) 24 24 25 + const EagerFetchMaxOps = 10000 26 + const OpsPerImportTx = 1000 27 + const OpsPerEagerFetch = 1000 28 + 25 29 type authoritativeOperationsCache struct { 26 30 mu sync.Mutex 27 31 28 - plcURL string 29 - operations map[uint64]logEntryWithSeq 32 + plcURL string 33 + operations map[uint64]logEntryWithSeq 34 + highestFetchedHeight uint64 30 35 } 31 36 32 37 type logEntryWithSeq struct { ··· 34 39 Seq uint64 `json:"seq"` 35 40 } 36 41 37 - func newAuthoritativeOperationsCache(plc string) *authoritativeOperationsCache { 38 - return &authoritativeOperationsCache{ 42 + func newAuthoritativeOperationsCache(ctx context.Context, plc string) *authoritativeOperationsCache { 43 + aoc := &authoritativeOperationsCache{ 39 44 plcURL: plc, 40 45 operations: make(map[uint64]logEntryWithSeq), 41 46 } 47 + 48 + go func() { 49 + ticker := time.NewTicker(500 * time.Millisecond) 50 + for { 51 + select { 52 + case <-ctx.Done(): 53 + return 54 + case <-ticker.C: 55 + aoc.eagerlyFetch(ctx) 56 + } 57 + } 58 + }() 59 + 60 + return aoc 42 61 } 43 62 44 - func getOrCreateAuthoritativeOperationsCache(aocsByPLC map[string]*authoritativeOperationsCache, plc string) *authoritativeOperationsCache { 63 + func getOrCreateAuthoritativeOperationsCache(ctx context.Context, aocsByPLC map[string]*authoritativeOperationsCache, plc string) *authoritativeOperationsCache { 45 64 aoc, ok := aocsByPLC[plc] 46 65 if !ok { 47 - aoc = newAuthoritativeOperationsCache(plc) 66 + aoc = newAuthoritativeOperationsCache(ctx, plc) 48 67 aocsByPLC[plc] = aoc 49 68 } 50 69 return aoc 51 70 } 52 71 72 + func (a *authoritativeOperationsCache) eagerlyFetch(ctx context.Context) { 73 + a.mu.Lock() 74 + defer a.mu.Unlock() 75 + 76 + curOps := len(a.operations) 77 + if curOps >= EagerFetchMaxOps { 78 + return 79 + } 80 + _, _ = a.fetchInMutex(ctx, a.highestFetchedHeight, OpsPerEagerFetch) 81 + } 82 + 53 83 func (a *authoritativeOperationsCache) dropSeqBelowOrEqual(highestCommittedSeq uint64) { 54 84 a.mu.Lock() 55 85 defer a.mu.Unlock() ··· 69 99 70 100 for _, entry := range entries { 71 101 a.operations[entry.Seq] = entry 102 + a.highestFetchedHeight = max(a.highestFetchedHeight, entry.Seq) 72 103 } 73 104 return uint64(len(entries)) < count, nil 74 105 } ··· 239 270 return nil, stacktrace.Propagate(err, "") 240 271 } 241 272 242 - aoc := getOrCreateAuthoritativeOperationsCache(d.aocsByPLC, plcURL) 273 + aoc := getOrCreateAuthoritativeOperationsCache(d.runnerContext, d.aocsByPLC, plcURL) 243 274 244 - entries, err := aoc.get(ctx, cursor, 1000) 275 + entries, err := aoc.get(ctx, cursor, OpsPerImportTx) 245 276 if err != nil { 246 277 return nil, stacktrace.Propagate(err, "") 247 278 }
+4 -3
abciapp/tx.go
··· 18 18 type TransactionAction string 19 19 20 20 type TransactionProcessorDependencies struct { 21 - plc plc.PLC 22 - tree plc.TreeProvider // TODO maybe we should move the TreeProvider definition out of the plc package then? 23 - aocsByPLC map[string]*authoritativeOperationsCache 21 + runnerContext context.Context 22 + plc plc.PLC 23 + tree plc.TreeProvider // TODO maybe we should move the TreeProvider definition out of the plc package then? 24 + aocsByPLC map[string]*authoritativeOperationsCache 24 25 } 25 26 26 27 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error)
+10 -7
abciapp/tx_import.go
··· 6 6 "net/url" 7 7 "time" 8 8 9 + "github.com/did-method-plc/go-didplc" 9 10 cbornode "github.com/ipfs/go-ipld-cbor" 10 11 "github.com/palantir/stacktrace" 12 + "github.com/samber/lo" 11 13 "tangled.org/gbl08ma.com/didplcbft/plc" 12 14 "tangled.org/gbl08ma.com/didplcbft/store" 13 15 ) ··· 122 124 }, nil 123 125 } 124 126 125 - aoc := getOrCreateAuthoritativeOperationsCache(deps.aocsByPLC, expectedPlcUrl) 127 + aoc := getOrCreateAuthoritativeOperationsCache(deps.runnerContext, deps.aocsByPLC, expectedPlcUrl) 126 128 127 129 expectedCursor, err := store.Tree.AuthoritativeImportProgress(roTree) 128 130 if err != nil { ··· 169 171 } 170 172 171 173 if execute { 174 + err := deps.plc.ImportOperationsFromAuthoritativeSource(ctx, lo.Map(operations, func(l logEntryWithSeq, _ int) didplc.LogEntry { 175 + return l.LogEntry 176 + })) 177 + if err != nil { 178 + return nil, stacktrace.Propagate(err, "") 179 + } 180 + 172 181 tree, err := deps.tree.MutableTree() 173 182 if err != nil { 174 183 return nil, stacktrace.Propagate(err, "") 175 184 } 176 185 177 - for _, entry := range operations { 178 - err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, entry.LogEntry) 179 - if err != nil { 180 - return nil, stacktrace.Propagate(err, "") 181 - } 182 - } 183 186 err = store.Tree.SetAuthoritativeImportProgress(tree, newCursor) 184 187 if err != nil { 185 188 return nil, stacktrace.Propagate(err, "")
+17 -1
httpapi/server.go
··· 8 8 "log" 9 9 "net" 10 10 "net/http" 11 + "net/http/pprof" 11 12 "slices" 12 13 "strconv" 13 14 "strings" ··· 82 83 s.router.HandleFunc("GET /{did}/log/last", s.makeDIDHandler(s.handleGetLastOp)) 83 84 s.router.HandleFunc("GET /{did}/data", s.makeDIDHandler(s.handleGetPLCData)) 84 85 s.router.HandleFunc("GET /export", s.handleExport) 86 + 87 + s.router.HandleFunc("/debug/pprof/", pprof.Index) 88 + s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) 89 + s.router.HandleFunc("/debug/pprof/profile", pprof.Profile) 90 + s.router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) 91 + s.router.HandleFunc("/debug/pprof/trace", pprof.Trace) 92 + 93 + // Register handlers for specific profiles 94 + s.router.Handle("/debug/pprof/heap", pprof.Handler("heap")) 95 + s.router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) 96 + s.router.Handle("/debug/pprof/block", pprof.Handler("block")) 97 + s.router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 85 98 } 86 99 87 100 // makeDIDHandler creates a wrapper handler that extracts DID from URL path ··· 306 319 return 307 320 } 308 321 309 - // TODO limit count to 1000 (for debugging it's more useful without limit) 322 + if count > 1000 { 323 + sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter") 324 + return 325 + } 310 326 } 311 327 312 328 afterStr := query.Get("after")
+3 -2
main.go
··· 56 56 badgerDB, err := badger.Open(badger. 57 57 DefaultOptions(badgerDBPath). 58 58 WithBlockSize(8 * 1024). 59 - WithMemTableSize(256 << 20). 59 + WithNumMemtables(3). 60 + WithNumLevelZeroTables(3). 60 61 WithCompression(options.ZSTD)) 61 62 if err != nil { 62 63 log.Fatalf("Opening badger database: %v", err) ··· 138 139 }() 139 140 140 141 if config.PLC.ListenAddress != "" { 141 - plcAPIServer, err := httpapi.NewServer(plc, node, config.PLC.ListenAddress, 15*time.Second) 142 + plcAPIServer, err := httpapi.NewServer(plc, node, config.PLC.ListenAddress, 30*time.Second) 142 143 if err != nil { 143 144 log.Fatalf("Creating PLC API server: %v", err) 144 145 }
+36 -13
plc/impl.go
··· 47 47 timestamp := syntax.Datetime(at.Format(types.ActualAtprotoDatetimeLayout)) 48 48 49 49 // TODO set true to false only while importing old ops 50 - _, err := plc.validator.Validate(atHeight, timestamp, did, opBytes, true) 50 + _, err := plc.validator.Validate(ctx, atHeight, timestamp, did, opBytes, true) 51 51 if err != nil { 52 52 return stacktrace.Propagate(err, "operation failed validation") 53 53 } ··· 62 62 timestamp := syntax.Datetime(t.Format(types.ActualAtprotoDatetimeLayout)) 63 63 64 64 // TODO set true to false only while importing old ops 65 - effects, err := plc.validator.Validate(WorkingTreeVersion, timestamp, did, opBytes, true) 65 + effects, err := plc.validator.Validate(ctx, WorkingTreeVersion, timestamp, did, opBytes, true) 66 66 if err != nil { 67 67 return stacktrace.Propagate(err, "operation failed validation") 68 68 } ··· 80 80 return nil 81 81 } 82 82 83 + func (plc *plcImpl) ImportOperationsFromAuthoritativeSource(ctx context.Context, newEntries []didplc.LogEntry) error { 84 + plc.mu.Lock() 85 + defer plc.mu.Unlock() 86 + 87 + tree, err := plc.treeProvider.MutableTree() 88 + if err != nil { 89 + return stacktrace.Propagate(err, "failed to obtain mutable tree") 90 + } 91 + 92 + for _, entry := range newEntries { 93 + err := plc.importOp(ctx, tree, entry) 94 + if err != nil { 95 + return stacktrace.Propagate(err, "") 96 + } 97 + } 98 + 99 + return nil 100 + } 101 + 83 102 func (plc *plcImpl) ImportOperationFromAuthoritativeSource(ctx context.Context, newEntry didplc.LogEntry) error { 84 103 plc.mu.Lock() 85 104 defer plc.mu.Unlock() ··· 89 108 return stacktrace.Propagate(err, "failed to obtain mutable tree") 90 109 } 91 110 111 + return stacktrace.Propagate(plc.importOp(ctx, tree, newEntry), "") 112 + } 113 + 114 + func (plc *plcImpl) importOp(ctx context.Context, tree *iavl.MutableTree, newEntry didplc.LogEntry) error { 92 115 newCID := newEntry.CID 93 116 newPrev := newEntry.Operation.AsOperation().PrevCIDStr() 94 117 95 118 mostRecentOpIndex := -1 96 119 indexOfPrev := -1 97 120 var iteratorErr error 98 - for entryIdx, entry := range store.Tree.AuditLogReverseIterator(tree, newEntry.DID, &iteratorErr) { 121 + for entryIdx, entry := range store.Tree.AuditLogReverseIterator(ctx, tree, newEntry.DID, &iteratorErr) { 99 122 entryCID := entry.CID.String() 100 123 if mostRecentOpIndex == -1 { 101 124 mostRecentOpIndex = entryIdx ··· 140 163 // there's nothing to do but store the operation, no nullification involved 141 164 newEntry.Nullified = false 142 165 143 - err = store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 166 + err := store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 144 167 return stacktrace.Propagate(err, "failed to commit operation") 145 168 } 146 169 ··· 157 180 } 158 181 159 182 newEntry.Nullified = false 160 - err = store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 183 + err := store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 161 184 return stacktrace.Propagate(err, "failed to commit operation") 162 185 } 163 186 ··· 170 193 return didplc.Doc{}, stacktrace.Propagate(err, "failed to obtain immutable tree") 171 194 } 172 195 173 - l, _, err := store.Tree.AuditLog(tree, did, false) 196 + l, _, err := store.Tree.AuditLog(ctx, tree, did, false) 174 197 if err != nil { 175 198 return didplc.Doc{}, stacktrace.Propagate(err, "") 176 199 } ··· 199 222 return nil, stacktrace.Propagate(err, "failed to obtain immutable tree") 200 223 } 201 224 202 - l, _, err := store.Tree.AuditLog(tree, did, false) 225 + l, _, err := store.Tree.AuditLog(ctx, tree, did, false) 203 226 if err != nil { 204 227 return nil, stacktrace.Propagate(err, "") 205 228 } ··· 229 252 return nil, stacktrace.Propagate(err, "failed to obtain immutable tree") 230 253 } 231 254 232 - l, _, err := store.Tree.AuditLog(tree, did, false) 255 + l, _, err := store.Tree.AuditLog(ctx, tree, did, false) 233 256 if err != nil { 234 257 return nil, stacktrace.Propagate(err, "") 235 258 } ··· 255 278 return didplc.OpEnum{}, stacktrace.Propagate(err, "failed to obtain immutable tree") 256 279 } 257 280 258 - l, _, err := store.Tree.AuditLog(tree, did, false) 281 + l, _, err := store.Tree.AuditLog(ctx, tree, did, false) 259 282 if err != nil { 260 283 return didplc.OpEnum{}, stacktrace.Propagate(err, "") 261 284 } ··· 279 302 return didplc.RegularOp{}, stacktrace.Propagate(err, "failed to obtain immutable tree") 280 303 } 281 304 282 - l, _, err := store.Tree.AuditLog(tree, did, false) 305 + l, _, err := store.Tree.AuditLog(ctx, tree, did, false) 283 306 if err != nil { 284 307 return didplc.RegularOp{}, stacktrace.Propagate(err, "") 285 308 } ··· 308 331 return nil, stacktrace.Propagate(err, "failed to obtain immutable tree") 309 332 } 310 333 311 - entries, err := store.Tree.ExportOperations(tree, after, count) 334 + entries, err := store.Tree.ExportOperations(ctx, tree, after, count) 312 335 return entries, stacktrace.Propagate(err, "") 313 336 } 314 337 ··· 316 339 plc *plcImpl 317 340 } 318 341 319 - func (a *inMemoryAuditLogFetcher) AuditLogReverseIterator(atHeight TreeVersion, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 342 + func (a *inMemoryAuditLogFetcher) AuditLogReverseIterator(ctx context.Context, atHeight TreeVersion, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 320 343 tree, err := a.plc.treeProvider.ImmutableTree(atHeight) 321 344 if err != nil { 322 345 *retErr = stacktrace.Propagate(err, "") 323 346 return func(yield func(int, types.SequencedLogEntry) bool) {} 324 347 } 325 348 326 - return store.Tree.AuditLogReverseIterator(tree, did, retErr) 349 + return store.Tree.AuditLogReverseIterator(ctx, tree, did, retErr) 327 350 }
+7 -6
plc/operation_validator.go
··· 1 1 package plc 2 2 3 3 import ( 4 + "context" 4 5 "errors" 5 6 "iter" 6 7 "strings" ··· 16 17 17 18 type AuditLogFetcher interface { 18 19 // AuditLogReverseIterator should return an iterator over the list of log entries for the specified DID, in reverse 19 - AuditLogReverseIterator(atHeight TreeVersion, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 20 + AuditLogReverseIterator(ctx context.Context, atHeight TreeVersion, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 20 21 } 21 22 22 23 type V0OperationValidator struct { ··· 35 36 } 36 37 37 38 // Validate returns the new complete AuditLog that the DID history would assume if validation passes, and an error if it doesn't pass 38 - func (v *V0OperationValidator) Validate(atHeight TreeVersion, timestamp syntax.Datetime, expectedDid string, opBytes []byte, laxChecking bool) (OperationEffects, error) { 39 + func (v *V0OperationValidator) Validate(ctx context.Context, atHeight TreeVersion, timestamp syntax.Datetime, expectedDid string, opBytes []byte, laxChecking bool) (OperationEffects, error) { 39 40 opEnum, op, err := unmarshalOp(opBytes) 40 41 if err != nil { 41 42 return OperationEffects{}, stacktrace.Propagate(errors.Join(ErrMalformedOperation, err), "") ··· 79 80 mostRecentOpIndex := -1 80 81 indexOfPrev := -1 81 82 var iteratorErr error 82 - for entryIdx, entry := range v.auditLogFetcher.AuditLogReverseIterator(atHeight, expectedDid, &iteratorErr) { 83 + for entryIdx, entry := range v.auditLogFetcher.AuditLogReverseIterator(ctx, atHeight, expectedDid, &iteratorErr) { 83 84 partialLog[entryIdx] = entry 84 85 if mostRecentOpIndex == -1 { 85 86 mostRecentOpIndex = entryIdx ··· 178 179 if len(nullifiedEntries) == 0 { 179 180 // (see prior note on september27Of2023) 180 181 if !laxChecking && timestamp.Time().After(september29Of2023) { 181 - err = v.EnforceOpsRateLimit(atHeight, expectedDid, timestamp.Time()) 182 + err = v.EnforceOpsRateLimit(ctx, atHeight, expectedDid, timestamp.Time()) 182 183 if err != nil { 183 184 return OperationEffects{}, stacktrace.Propagate(err, "") 184 185 } ··· 213 214 ) 214 215 215 216 // EnforceOpsRateLimit is ported from the TypeScript enforceOpsRateLimit function, adapted to not require fetching the entire log 216 - func (v *V0OperationValidator) EnforceOpsRateLimit(atHeight TreeVersion, did string, newOperationTimestamp time.Time) error { 217 + func (v *V0OperationValidator) EnforceOpsRateLimit(ctx context.Context, atHeight TreeVersion, did string, newOperationTimestamp time.Time) error { 217 218 hourAgo := newOperationTimestamp.Add(-time.Hour) 218 219 dayAgo := newOperationTimestamp.Add(-24 * time.Hour) 219 220 weekAgo := newOperationTimestamp.Add(-7 * 24 * time.Hour) 220 221 221 222 var withinHour, withinDay, withinWeek int 222 223 var err error 223 - for _, entry := range v.auditLogFetcher.AuditLogReverseIterator(atHeight, did, &err) { 224 + for _, entry := range v.auditLogFetcher.AuditLogReverseIterator(ctx, atHeight, did, &err) { 224 225 if entry.Nullified { 225 226 // The typescript implementation operates over a `ops` array which doesn't include nullified ops 226 227 // (With recovery ops also skipping rate limits, doesn't this leave the PLC vulnerable to the spam of constant recovery operations? TODO investigate)
+2 -1
plc/plc.go
··· 46 46 } 47 47 48 48 type OperationValidator interface { 49 - Validate(atHeight TreeVersion, timestamp syntax.Datetime, expectedDid string, opBytes []byte, allowLegacy bool) (OperationEffects, error) 49 + Validate(ctx context.Context, atHeight TreeVersion, timestamp syntax.Datetime, expectedDid string, opBytes []byte, allowLegacy bool) (OperationEffects, error) 50 50 } 51 51 52 52 type PLC interface { ··· 67 67 type WritePLC interface { 68 68 ExecuteOperation(ctx context.Context, timestamp time.Time, did string, opBytes []byte) error 69 69 ImportOperationFromAuthoritativeSource(ctx context.Context, entry didplc.LogEntry) error 70 + ImportOperationsFromAuthoritativeSource(ctx context.Context, entries []didplc.LogEntry) error 70 71 }
+51 -36
store/tree.go
··· 1 1 package store 2 2 3 3 import ( 4 + "context" 4 5 "encoding/base32" 5 6 "encoding/binary" 6 7 "iter" ··· 24 25 var Tree PLCTreeStore = &TreeStore{} 25 26 26 27 type PLCTreeStore interface { 27 - AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 28 - AuditLogReverseIterator(tree ReadOnlyTree, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 29 - ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 28 + AuditLog(ctx context.Context, tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 29 + AuditLogReverseIterator(ctx context.Context, tree ReadOnlyTree, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 30 + ExportOperations(ctx context.Context, tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 30 31 StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 31 32 SetOperationCreatedAt(tree *iavl.MutableTree, seqID uint64, createdAt time.Time) error 32 33 ··· 42 43 // TreeStore exists just to groups methods nicely 43 44 type TreeStore struct{} 44 45 45 - func (t *TreeStore) AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 46 + func (t *TreeStore) AuditLog(ctx context.Context, tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 46 47 proofs := []*ics23.CommitmentProof{} 47 48 48 49 didBytes, err := DIDToBytes(did) ··· 52 53 53 54 logKey := marshalDIDLogKey(didBytes) 54 55 55 - has, err := tree.Has(logKey) 56 + logOperations, err := tree.Get(logKey) 56 57 if err != nil { 57 58 return nil, nil, stacktrace.Propagate(err, "") 58 59 } 59 - 60 - var operationKeys [][]byte 61 - if has { 62 - logOperations, err := tree.Get(logKey) 63 - if err != nil { 64 - return nil, nil, stacktrace.Propagate(err, "") 65 - } 66 - operationKeys = make([][]byte, 0, len(logOperations)/8) 67 - for seqBytes := range slices.Chunk(logOperations, 8) { 68 - operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 69 - } 60 + operationKeys := make([][]byte, 0, len(logOperations)/8) 61 + for seqBytes := range slices.Chunk(logOperations, 8) { 62 + operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 70 63 } 71 64 72 65 if withProof { ··· 79 72 80 73 logEntries := make([]types.SequencedLogEntry, 0, len(operationKeys)) 81 74 for _, opKey := range operationKeys { 75 + select { 76 + case <-ctx.Done(): 77 + return nil, nil, stacktrace.Propagate(ctx.Err(), "") 78 + default: 79 + } 80 + 82 81 operationValue, err := tree.Get(opKey) 83 82 if err != nil { 84 83 return nil, nil, stacktrace.Propagate(err, "") ··· 110 109 return logEntries, combinedProof, nil 111 110 } 112 111 113 - func (t *TreeStore) AuditLogReverseIterator(tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 112 + func (t *TreeStore) AuditLogReverseIterator(ctx context.Context, tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 114 113 return func(yield func(int, types.SequencedLogEntry) bool) { 115 114 didBytes, err := DIDToBytes(did) 116 115 if err != nil { ··· 120 119 121 120 logKey := marshalDIDLogKey(didBytes) 122 121 123 - has, err := tree.Has(logKey) 122 + logOperations, err := tree.Get(logKey) 124 123 if err != nil { 125 124 *retErr = stacktrace.Propagate(err, "") 126 125 return 126 + } 127 + operationKeys := make([][]byte, 0, len(logOperations)/8) 128 + for seqBytes := range slices.Chunk(logOperations, 8) { 129 + operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 127 130 } 128 131 129 - var operationKeys [][]byte 130 - if has { 131 - logOperations, err := tree.Get(logKey) 132 - if err != nil { 133 - *retErr = stacktrace.Propagate(err, "") 132 + for i := len(operationKeys) - 1; i >= 0; i-- { 133 + select { 134 + case <-ctx.Done(): 135 + *retErr = stacktrace.Propagate(ctx.Err(), "") 134 136 return 135 - } 136 - operationKeys = make([][]byte, 0, len(logOperations)/8) 137 - for seqBytes := range slices.Chunk(logOperations, 8) { 138 - operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 137 + default: 139 138 } 140 - } 141 139 142 - for i := len(operationKeys) - 1; i >= 0; i-- { 143 140 opKey := operationKeys[i] 144 141 operationValue, err := tree.Get(opKey) 145 142 if err != nil { ··· 160 157 } 161 158 } 162 159 163 - func (t *TreeStore) ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) { 160 + func (t *TreeStore) ExportOperations(ctx context.Context, tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) { 164 161 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 165 162 start := after + 1 166 163 startKey := marshalOperationKey(start) ··· 169 166 entries := make([]types.SequencedLogEntry, 0, count) 170 167 var iterErr error 171 168 tree.IterateRange(startKey, endKey, true, func(operationKey, operationValue []byte) bool { 169 + select { 170 + case <-ctx.Done(): 171 + iterErr = stacktrace.Propagate(ctx.Err(), "") 172 + return true 173 + default: 174 + } 175 + 172 176 logEntry, err := unmarshalLogEntry(operationKey, operationValue) 173 177 if err != nil { 174 178 iterErr = stacktrace.Propagate(err, "") ··· 235 239 opKey := marshalOperationKey(seq) 236 240 opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) 237 241 238 - _, err = tree.Set(opKey, opValue) 242 + updated, err := tree.Set(opKey, opValue) 239 243 if err != nil { 240 244 return stacktrace.Propagate(err, "") 241 245 } 246 + if updated { 247 + return stacktrace.NewError("expected to be writing to a new operation key but updated instead") 248 + } 242 249 243 250 logOperations = append(logOperations, opKey[1:9]...) 244 251 _, err = tree.Set(logKey, logOperations) ··· 274 281 275 282 func getNextSeqID(tree *iavl.MutableTree) (uint64, error) { 276 283 seq := uint64(0) 277 - var err error 278 - tree.IterateRange(minOperationKey, maxOperationKey, false, func(key, value []byte) bool { 279 - seq, err = unmarshalOperationKey(key) 280 - return true 281 - }) 282 284 285 + itr, err := tree.Iterator(minOperationKey, maxOperationKey, false) 286 + if err != nil { 287 + return 0, stacktrace.Propagate(err, "") 288 + } 289 + 290 + defer itr.Close() 291 + 292 + if itr.Valid() { 293 + seq, err = unmarshalOperationKey(itr.Key()) 294 + if err != nil { 295 + return 0, stacktrace.Propagate(err, "") 296 + } 297 + } 283 298 return seq + 1, stacktrace.Propagate(err, "") 284 299 } 285 300