A very experimental PLC implementation which uses BFT consensus for decentralization

Support and consume only sequence-based export

gbl08ma.com 50f5461a 79a3bedc

verified
+23 -7
httpapi/server.go
··· 9 "net" 10 "net/http" 11 "slices" 12 "strings" 13 "sync" 14 "sync/atomic" 15 "time" 16 17 "github.com/bluesky-social/indigo/atproto/atcrypto" 18 - "github.com/bluesky-social/indigo/atproto/syntax" 19 "github.com/cometbft/cometbft/node" 20 "github.com/did-method-plc/go-didplc" 21 "github.com/google/uuid" 22 cbornode "github.com/ipfs/go-ipld-cbor" 23 "github.com/palantir/stacktrace" 24 "github.com/rs/cors" 25 26 "tangled.org/gbl08ma/didplcbft/abciapp" 27 "tangled.org/gbl08ma/didplcbft/plc" ··· 307 } 308 309 afterStr := query.Get("after") 310 - after, err := syntax.ParseDatetime(afterStr) 311 - if err != nil && afterStr != "" { 312 - sendErrorResponse(w, http.StatusBadRequest, "Invalid after parameter") 313 - return 314 } 315 316 - entries, err := s.plc.Export(r.Context(), plc.CommittedTreeVersion, after.Time(), count) 317 if handlePLCError(w, err, "") { 318 return 319 } 320 321 w.Header().Set("Content-Type", "application/jsonlines") 322 for _, entry := range entries { 323 - json.NewEncoder(w).Encode(&entry) 324 } 325 } 326 ··· 342 343 // sendErrorResponse sends an error response with the specified status code and message. 344 func sendErrorResponse(w http.ResponseWriter, statusCode int, message string) { 345 w.WriteHeader(statusCode) 346 json.NewEncoder(w).Encode(map[string]string{"message": message}) 347 }
··· 9 "net" 10 "net/http" 11 "slices" 12 + "strconv" 13 "strings" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 "github.com/bluesky-social/indigo/atproto/atcrypto" 19 "github.com/cometbft/cometbft/node" 20 "github.com/did-method-plc/go-didplc" 21 "github.com/google/uuid" 22 cbornode "github.com/ipfs/go-ipld-cbor" 23 "github.com/palantir/stacktrace" 24 "github.com/rs/cors" 25 + "github.com/samber/lo" 26 27 "tangled.org/gbl08ma/didplcbft/abciapp" 28 "tangled.org/gbl08ma/didplcbft/plc" ··· 308 } 309 310 afterStr := query.Get("after") 311 + after := uint64(0) 312 + var err error 313 + if afterStr != "" { 314 + after, err = strconv.ParseUint(afterStr, 10, 64) 315 + if err != nil { 316 + sendErrorResponse(w, http.StatusBadRequest, "Only sequence-based pagination is supported") 317 + return 318 + } 319 } 320 321 + entries, err := s.plc.Export(r.Context(), plc.CommittedTreeVersion, after, count) 322 if handlePLCError(w, err, "") { 323 return 324 } 325 326 w.Header().Set("Content-Type", "application/jsonlines") 327 + 328 + type jsonEntry struct { 329 + Seq uint64 `json:"seq"` 330 + Type string `json:"type"` 331 + *didplc.LogEntry 332 + } 333 for _, entry := range entries { 334 + json.NewEncoder(w).Encode(jsonEntry{ 335 + Seq: entry.Seq, 336 + Type: "sequenced_op", 337 + LogEntry: lo.ToPtr(entry.ToDIDPLCLogEntry()), 338 + }) 339 } 340 } 341 ··· 357 358 // sendErrorResponse sends an error response with the specified status code and message. 359 func sendErrorResponse(w http.ResponseWriter, statusCode int, message string) { 360 + w.Header().Set("Content-Type", "application/json") 361 w.WriteHeader(statusCode) 362 json.NewEncoder(w).Encode(map[string]string{"message": message}) 363 }
+4 -3
httpapi/server_test.go
··· 13 "github.com/did-method-plc/go-didplc" 14 "github.com/stretchr/testify/require" 15 "tangled.org/gbl08ma/didplcbft/plc" 16 ) 17 18 // MockReadPLC is a mock implementation of the ReadPLC interface for testing. ··· 92 return didplc.RegularOp{}, nil 93 } 94 95 - func (m *MockReadPLC) Export(ctx context.Context, atHeight plc.TreeVersion, after time.Time, count int) ([]didplc.LogEntry, error) { 96 if m.shouldReturnError { 97 - return []didplc.LogEntry{}, fmt.Errorf("internal error") 98 } 99 - return []didplc.LogEntry{}, nil 100 } 101 102 func TestServer(t *testing.T) {
··· 13 "github.com/did-method-plc/go-didplc" 14 "github.com/stretchr/testify/require" 15 "tangled.org/gbl08ma/didplcbft/plc" 16 + "tangled.org/gbl08ma/didplcbft/types" 17 ) 18 19 // MockReadPLC is a mock implementation of the ReadPLC interface for testing. ··· 93 return didplc.RegularOp{}, nil 94 } 95 96 + func (m *MockReadPLC) Export(ctx context.Context, atHeight plc.TreeVersion, after uint64, count int) ([]types.SequencedLogEntry, error) { 97 if m.shouldReturnError { 98 + return []types.SequencedLogEntry{}, fmt.Errorf("internal error") 99 } 100 + return []types.SequencedLogEntry{}, nil 101 } 102 103 func TestServer(t *testing.T) {
+14 -23
importer/importer_test.go
··· 25 ) 26 27 func TestImportV2(t *testing.T) { 28 - c, err := rpchttp.New("http://localhost:26100", "/websocket") 29 require.NoError(t, err) 30 31 ctx := t.Context() ··· 41 var wg sync.WaitGroup 42 noMoreNewEntries := atomic.Bool{} 43 wg.Go(func() { 44 - for entry := range iterateOverExport(ctx, "2023-10-10T00:00:00.000Z") { 45 if totalAwaiting.Size() > 5000 { 46 for totalAwaiting.Size() > 1000 { 47 time.Sleep(1 * time.Second) ··· 184 wg.Wait() 185 } 186 187 - func iterateOverExport(ctx context.Context, startAt string) iter.Seq[didplc.LogEntry] { 188 return func(yield func(didplc.LogEntry) bool) { 189 const batchSize = 1000 190 baseURL := didplc.DefaultDirectoryURL + "/export" 191 client := &http.Client{Timeout: 30 * time.Second} 192 - 193 - // The /export seems to sometimes return outright duplicated entries :weary: 194 - seenCIDs := map[string]struct{}{} 195 196 after := startAt 197 for { ··· 204 205 q := req.URL.Query() 206 q.Add("count", fmt.Sprint(batchSize)) 207 - if after != "" { 208 - q.Add("after", after) 209 - } 210 req.URL.RawQuery = q.Encode() 211 212 resp, err := client.Do(req) ··· 219 return // Non-200 status code 220 } 221 222 - entries := make([]didplc.LogEntry, 0, batchSize) 223 224 // Read response body 225 s := bufio.NewScanner(resp.Body) 226 receivedEntries := 0 227 for s.Scan() { 228 - var entry didplc.LogEntry 229 if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 230 return // Failed to decode JSON 231 } 232 - if _, present := seenCIDs[entry.CID]; !present { 233 - entries = append(entries, entry) 234 - seenCIDs[entry.CID] = struct{}{} 235 - } 236 receivedEntries++ 237 } 238 if s.Err() != nil { ··· 244 } 245 246 // Process each entry 247 - var lastCreatedAt string 248 for _, entry := range entries { 249 - lastCreatedAt = entry.CreatedAt 250 - if !yield(entry) { 251 return 252 } 253 } ··· 255 if receivedEntries < batchSize { 256 return 257 } 258 - 259 - after = lastCreatedAt 260 - 261 - // Small delay to be respectful to the API 262 - time.Sleep(100 * time.Millisecond) 263 } 264 } 265 }
··· 25 ) 26 27 func TestImportV2(t *testing.T) { 28 + c, err := rpchttp.New("http://localhost:26657", "/websocket") 29 require.NoError(t, err) 30 31 ctx := t.Context() ··· 41 var wg sync.WaitGroup 42 noMoreNewEntries := atomic.Bool{} 43 wg.Go(func() { 44 + for entry := range iterateOverExport(ctx, 0) { 45 if totalAwaiting.Size() > 5000 { 46 for totalAwaiting.Size() > 1000 { 47 time.Sleep(1 * time.Second) ··· 184 wg.Wait() 185 } 186 187 + func iterateOverExport(ctx context.Context, startAt uint64) iter.Seq[didplc.LogEntry] { 188 return func(yield func(didplc.LogEntry) bool) { 189 const batchSize = 1000 190 baseURL := didplc.DefaultDirectoryURL + "/export" 191 client := &http.Client{Timeout: 30 * time.Second} 192 193 after := startAt 194 for { ··· 201 202 q := req.URL.Query() 203 q.Add("count", fmt.Sprint(batchSize)) 204 + q.Add("after", fmt.Sprint(after)) 205 req.URL.RawQuery = q.Encode() 206 207 resp, err := client.Do(req) ··· 214 return // Non-200 status code 215 } 216 217 + type logEntryWithSeq struct { 218 + didplc.LogEntry 219 + Seq uint64 `json:"seq"` 220 + } 221 + 222 + entries := make([]logEntryWithSeq, 0, batchSize) 223 224 // Read response body 225 s := bufio.NewScanner(resp.Body) 226 receivedEntries := 0 227 for s.Scan() { 228 + var entry logEntryWithSeq 229 if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 230 return // Failed to decode JSON 231 } 232 + entries = append(entries, entry) 233 receivedEntries++ 234 } 235 if s.Err() != nil { ··· 241 } 242 243 // Process each entry 244 for _, entry := range entries { 245 + after = entry.Seq 246 + if !yield(entry.LogEntry) { 247 return 248 } 249 } ··· 251 if receivedEntries < batchSize { 252 return 253 } 254 } 255 } 256 }
+66 -28
plc/impl.go
··· 13 "github.com/samber/lo" 14 "github.com/samber/mo" 15 "tangled.org/gbl08ma/didplcbft/store" 16 ) 17 18 type TreeProvider interface { ··· 43 plc.mu.Lock() 44 defer plc.mu.Unlock() 45 46 - timestamp := syntax.Datetime(at.Format(store.ActualAtprotoDatetimeLayout)) 47 48 // TODO set true to false only while importing old ops 49 _, err := plc.validator.Validate(atHeight, timestamp, did, opBytes, true) ··· 58 plc.mu.Lock() 59 defer plc.mu.Unlock() 60 61 - timestamp := syntax.Datetime(t.Format(store.ActualAtprotoDatetimeLayout)) 62 63 // TODO set true to false only while importing old ops 64 effects, err := plc.validator.Validate(WorkingTreeVersion, timestamp, did, opBytes, true) ··· 97 newCID := newEntry.CID 98 newPrev := newEntry.Operation.AsOperation().PrevCIDStr() 99 100 - // TODO avoid redundant CreatedAt formating and parsing by using a specialized LogEntry type internally (i.e. between us and the store) 101 newCreatedAtDT, err := syntax.ParseDatetime(newEntry.CreatedAt) 102 if err != nil { 103 return stacktrace.Propagate(err, "") ··· 106 107 mustFullyReplaceHistory := false 108 for _, entry := range l { 109 - existingCreatedAt, err := syntax.ParseDatetime(entry.CreatedAt) 110 - if err != nil { 111 - return stacktrace.Propagate(err, "") 112 - } 113 - if existingCreatedAt.Time().After(newCreatedAt) { 114 // We're trying to import an operation whose timestamp precedes one of the timestamps for operations we already know about 115 // We'll need to discard all known history and import it anew using the authoritative source data (same as when dealing with sequence forks) 116 mustFullyReplaceHistory = true 117 break 118 } 119 120 - if entry.CID == newCID { 121 // If an operation with the same CID already exists -> easy-ish 122 123 // this operation is already present, there is nothing to do ··· 127 } 128 } 129 130 - if len(l) == 0 || (!mustFullyReplaceHistory && l[len(l)-1].CID == newPrev) { 131 // If DID doesn't exist at all -> easy 132 // If prev matches CID of latest operation, and resulting timestamp sequence monotonically increases -> easy 133 err = store.Tree.StoreOperation(tree, newEntry, mo.None[int]()) ··· 166 return didplc.Doc{}, stacktrace.Propagate(ErrDIDNotFound, "") 167 } 168 169 - opEnum := l[len(l)-1].Operation 170 - if opEnum.Tombstone != nil { 171 - return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 172 } 173 - return opEnum.AsOperation().Doc(did) 174 } 175 176 func (plc *plcImpl) OperationLog(ctx context.Context, atHeight TreeVersion, did string) ([]didplc.OpEnum, error) { ··· 195 return nil, stacktrace.Propagate(ErrDIDNotFound, "") 196 } 197 198 - return lo.Map(l, func(logEntry didplc.LogEntry, _ int) didplc.OpEnum { 199 return logEntry.Operation 200 }), nil 201 } ··· 221 return nil, stacktrace.Propagate(ErrDIDNotFound, "") 222 } 223 224 - return l, nil 225 } 226 227 func (plc *plcImpl) LastOperation(ctx context.Context, atHeight TreeVersion, did string) (didplc.OpEnum, error) { 228 - // GetLastOp - /:did/log/last - latest op from audit log which isn't nullified (isn't the latest op guaranteed to not be nullified?) 229 // if missing -> returns ErrDIDNotFound 230 // if tombstone -> returns tombstone op 231 plc.mu.Lock() ··· 245 return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 246 } 247 248 - return l[len(l)-1].Operation, nil 249 } 250 251 func (plc *plcImpl) Data(ctx context.Context, atHeight TreeVersion, did string) (didplc.RegularOp, error) { ··· 269 return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDNotFound, "") 270 } 271 272 - opEnum := l[len(l)-1].Operation 273 - if opEnum.Tombstone != nil { 274 - return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 275 } 276 - if opEnum.Regular != nil { 277 - return *opEnum.Regular, nil 278 - } 279 - return *modernizeOp(opEnum.Legacy), nil 280 } 281 282 - func (plc *plcImpl) Export(ctx context.Context, atHeight TreeVersion, after time.Time, count int) ([]didplc.LogEntry, error) { 283 plc.mu.Lock() 284 defer plc.mu.Unlock() 285 ··· 296 plc *plcImpl 297 } 298 299 - func (a *inMemoryAuditLogFetcher) AuditLogReverseIterator(atHeight TreeVersion, did string, retErr *error) iter.Seq2[int, didplc.LogEntry] { 300 tree, err := a.plc.treeProvider.ImmutableTree(atHeight) 301 if err != nil { 302 *retErr = stacktrace.Propagate(err, "") 303 - return func(yield func(int, didplc.LogEntry) bool) {} 304 } 305 306 return store.Tree.AuditLogReverseIterator(tree, did, retErr)
··· 13 "github.com/samber/lo" 14 "github.com/samber/mo" 15 "tangled.org/gbl08ma/didplcbft/store" 16 + "tangled.org/gbl08ma/didplcbft/types" 17 ) 18 19 type TreeProvider interface { ··· 44 plc.mu.Lock() 45 defer plc.mu.Unlock() 46 47 + timestamp := syntax.Datetime(at.Format(types.ActualAtprotoDatetimeLayout)) 48 49 // TODO set true to false only while importing old ops 50 _, err := plc.validator.Validate(atHeight, timestamp, did, opBytes, true) ··· 59 plc.mu.Lock() 60 defer plc.mu.Unlock() 61 62 + timestamp := syntax.Datetime(t.Format(types.ActualAtprotoDatetimeLayout)) 63 64 // TODO set true to false only while importing old ops 65 effects, err := plc.validator.Validate(WorkingTreeVersion, timestamp, did, opBytes, true) ··· 98 newCID := newEntry.CID 99 newPrev := newEntry.Operation.AsOperation().PrevCIDStr() 100 101 newCreatedAtDT, err := syntax.ParseDatetime(newEntry.CreatedAt) 102 if err != nil { 103 return stacktrace.Propagate(err, "") ··· 106 107 mustFullyReplaceHistory := false 108 for _, entry := range l { 109 + if entry.CreatedAt.After(newCreatedAt) { 110 // We're trying to import an operation whose timestamp precedes one of the timestamps for operations we already know about 111 // We'll need to discard all known history and import it anew using the authoritative source data (same as when dealing with sequence forks) 112 mustFullyReplaceHistory = true 113 break 114 } 115 116 + if entry.CID.String() == newCID && entry.Nullified == newEntry.Nullified { 117 // If an operation with the same CID already exists -> easy-ish 118 119 // this operation is already present, there is nothing to do ··· 123 } 124 } 125 126 + if len(l) == 0 || (!mustFullyReplaceHistory && l[len(l)-1].CID.String() == newPrev) { 127 // If DID doesn't exist at all -> easy 128 // If prev matches CID of latest operation, and resulting timestamp sequence monotonically increases -> easy 129 err = store.Tree.StoreOperation(tree, newEntry, mo.None[int]()) ··· 162 return didplc.Doc{}, stacktrace.Propagate(ErrDIDNotFound, "") 163 } 164 165 + // find most recent operation that isn't nullified (during authoritative import, the latest operation might be nullified) 166 + for i := len(l) - 1; i >= 0; i-- { 167 + opEnum := l[i].Operation 168 + if !l[i].Nullified { 169 + if opEnum.Tombstone != nil { 170 + return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 171 + } 172 + return opEnum.AsOperation().Doc(did) 173 + } 174 } 175 + // in the worst case all operations are somehow nullified and the loop ends with opEnum holding a nullified operation 176 + // that _shouldn't_ be possible (right?) but if it does happen, let's just behave as if the DID was tombstoned 177 + return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 178 } 179 180 func (plc *plcImpl) OperationLog(ctx context.Context, atHeight TreeVersion, did string) ([]didplc.OpEnum, error) { ··· 199 return nil, stacktrace.Propagate(ErrDIDNotFound, "") 200 } 201 202 + l = lo.Filter(l, func(logEntry types.SequencedLogEntry, _ int) bool { 203 + return !logEntry.Nullified 204 + }) 205 + 206 + return lo.Map(l, func(logEntry types.SequencedLogEntry, _ int) didplc.OpEnum { 207 return logEntry.Operation 208 }), nil 209 } ··· 229 return nil, stacktrace.Propagate(ErrDIDNotFound, "") 230 } 231 232 + // if the latest operations are nullified (happens while authoritative import is in progress), just pretend we don't have them yet, 233 + // since a properly functioning PLC implementation could never have the latest operation for a DID be nullified 234 + dropAfterIdx := len(l) - 1 235 + for ; dropAfterIdx >= 0; dropAfterIdx-- { 236 + if !l[dropAfterIdx].Nullified { 237 + break 238 + } 239 + } 240 + l = l[0 : dropAfterIdx+1] 241 + 242 + return lo.Map(l, func(logEntry types.SequencedLogEntry, _ int) didplc.LogEntry { 243 + return logEntry.ToDIDPLCLogEntry() 244 + }), nil 245 } 246 247 func (plc *plcImpl) LastOperation(ctx context.Context, atHeight TreeVersion, did string) (didplc.OpEnum, error) { 248 + // GetLastOp - /:did/log/last - latest op from audit log which isn't nullified 249 // if missing -> returns ErrDIDNotFound 250 // if tombstone -> returns tombstone op 251 plc.mu.Lock() ··· 265 return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 266 } 267 268 + // find most recent operation that isn't nullified (during authoritative import, the latest operation might be nullified) 269 + for i := len(l) - 1; i >= 0; i-- { 270 + opEnum := l[i].Operation 271 + if !l[i].Nullified { 272 + return opEnum, nil 273 + } 274 + } 275 + // in the worst case all operations are somehow nullified and the loop ends with opEnum holding a nullified operation 276 + // that _shouldn't_ be possible (right?) but if it does happen, let's just behave as if the DID did not exist 277 + return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 278 } 279 280 func (plc *plcImpl) Data(ctx context.Context, atHeight TreeVersion, did string) (didplc.RegularOp, error) { ··· 298 return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDNotFound, "") 299 } 300 301 + // find most recent operation that isn't nullified (during authoritative import, the latest operation might be nullified) 302 + for i := len(l) - 1; i >= 0; i-- { 303 + opEnum := l[i].Operation 304 + if !l[i].Nullified { 305 + if opEnum.Tombstone != nil { 306 + return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 307 + } 308 + if opEnum.Regular != nil { 309 + return *opEnum.Regular, nil 310 + } 311 + return *modernizeOp(opEnum.Legacy), nil 312 + } 313 } 314 + // in the worst case all operations are somehow nullified and the loop ends with opEnum holding a nullified operation 315 + // that _shouldn't_ be possible (right?) but if it does happen, let's just behave as if the DID was tombstoned 316 + return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 317 + 318 } 319 320 + func (plc *plcImpl) Export(ctx context.Context, atHeight TreeVersion, after uint64, count int) ([]types.SequencedLogEntry, error) { 321 plc.mu.Lock() 322 defer plc.mu.Unlock() 323 ··· 334 plc *plcImpl 335 } 336 337 + func (a *inMemoryAuditLogFetcher) AuditLogReverseIterator(atHeight TreeVersion, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 338 tree, err := a.plc.treeProvider.ImmutableTree(atHeight) 339 if err != nil { 340 *retErr = stacktrace.Propagate(err, "") 341 + return func(yield func(int, types.SequencedLogEntry) bool) {} 342 } 343 344 return store.Tree.AuditLogReverseIterator(tree, did, retErr)
+9 -22
plc/operation_validator.go
··· 11 "github.com/did-method-plc/go-didplc" 12 "github.com/palantir/stacktrace" 13 "github.com/samber/mo" 14 ) 15 16 type AuditLogFetcher interface { 17 // AuditLogReverseIterator should return an iterator over the list of log entries for the specified DID, in reverse 18 - AuditLogReverseIterator(atHeight TreeVersion, did string, err *error) iter.Seq2[int, didplc.LogEntry] 19 } 20 21 type V0OperationValidator struct { ··· 74 75 proposedPrev := op.PrevCIDStr() 76 77 - partialLog := make(map[int]didplc.LogEntry) 78 mostRecentOpIndex := -1 79 indexOfPrev := -1 80 var iteratorErr error ··· 88 } 89 } 90 91 - if entry.CID == proposedPrev { 92 indexOfPrev = entryIdx 93 break 94 } ··· 98 return OperationEffects{}, stacktrace.Propagate(iteratorErr, "") 99 } 100 101 - nullifiedEntries := []didplc.LogEntry{} 102 nullifiedEntriesStartingIndex := mo.None[int]() 103 104 if mostRecentOpIndex < 0 { ··· 125 126 // timestamps must increase monotonically 127 mostRecentOp := partialLog[mostRecentOpIndex] 128 - mostRecentCreatedAt, err := syntax.ParseDatetime(mostRecentOp.CreatedAt) 129 - if err != nil { 130 - return OperationEffects{}, stacktrace.Propagate(err, "reached invalid internal state") 131 - } 132 - if !timestamp.Time().After(mostRecentCreatedAt.Time()) { 133 return OperationEffects{}, stacktrace.Propagate(ErrInvalidOperationSequence, "") 134 } 135 ··· 156 } 157 158 // recovery key gets a 72hr window to do historical re-writes 159 - firstNullifiedCreatedAt, err := syntax.ParseDatetime(nullifiedEntries[0].CreatedAt) 160 - if err != nil { 161 - return OperationEffects{}, stacktrace.Propagate(err, "reached invalid internal state") 162 - } 163 - if timestamp.Time().Sub(firstNullifiedCreatedAt.Time()) > 72*time.Hour { 164 return OperationEffects{}, stacktrace.Propagate(ErrRecoveryWindowExpired, "") 165 } 166 } else { ··· 230 for _, entry := range v.auditLogFetcher.AuditLogReverseIterator(atHeight, did, &err) { 231 if entry.Nullified { 232 // The typescript implementation operates over a `ops` array which doesn't include nullified ops 233 - // (With recovery ops also skipping rate limits, doesn't this leave the PLC vulnerable to the spam of constant recovery operations?) 234 continue 235 } 236 - // Parse the CreatedAt timestamp string 237 - // The CreatedAt field is stored as a string in ISO 8601 format 238 - opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) 239 - if err != nil { 240 - return stacktrace.Propagate(err, "") 241 - } 242 - opTime := opDatetime.Time() 243 244 if opTime.Before(weekAgo) { 245 // operations are always ordered by timestamp, and we're iterating from newest to oldest
··· 11 "github.com/did-method-plc/go-didplc" 12 "github.com/palantir/stacktrace" 13 "github.com/samber/mo" 14 + "tangled.org/gbl08ma/didplcbft/types" 15 ) 16 17 type AuditLogFetcher interface { 18 // AuditLogReverseIterator should return an iterator over the list of log entries for the specified DID, in reverse 19 + AuditLogReverseIterator(atHeight TreeVersion, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 20 } 21 22 type V0OperationValidator struct { ··· 75 76 proposedPrev := op.PrevCIDStr() 77 78 + partialLog := make(map[int]types.SequencedLogEntry) 79 mostRecentOpIndex := -1 80 indexOfPrev := -1 81 var iteratorErr error ··· 89 } 90 } 91 92 + if entry.CID.String() == proposedPrev { 93 indexOfPrev = entryIdx 94 break 95 } ··· 99 return OperationEffects{}, stacktrace.Propagate(iteratorErr, "") 100 } 101 102 + nullifiedEntries := []types.SequencedLogEntry{} 103 nullifiedEntriesStartingIndex := mo.None[int]() 104 105 if mostRecentOpIndex < 0 { ··· 126 127 // timestamps must increase monotonically 128 mostRecentOp := partialLog[mostRecentOpIndex] 129 + if !timestamp.Time().After(mostRecentOp.CreatedAt) { 130 return OperationEffects{}, stacktrace.Propagate(ErrInvalidOperationSequence, "") 131 } 132 ··· 153 } 154 155 // recovery key gets a 72hr window to do historical re-writes 156 + if timestamp.Time().Sub(nullifiedEntries[0].CreatedAt) > 72*time.Hour { 157 return OperationEffects{}, stacktrace.Propagate(ErrRecoveryWindowExpired, "") 158 } 159 } else { ··· 223 for _, entry := range v.auditLogFetcher.AuditLogReverseIterator(atHeight, did, &err) { 224 if entry.Nullified { 225 // The typescript implementation operates over a `ops` array which doesn't include nullified ops 226 + // (With recovery ops also skipping rate limits, doesn't this leave the PLC vulnerable to the spam of constant recovery operations? TODO investigate) 227 continue 228 } 229 + opTime := entry.CreatedAt 230 231 if opTime.Before(weekAgo) { 232 // operations are always ordered by timestamp, and we're iterating from newest to oldest
+2 -1
plc/plc.go
··· 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/did-method-plc/go-didplc" 10 ) 11 12 var ErrDIDNotFound = errors.New("DID not found") ··· 60 AuditLog(ctx context.Context, atHeight TreeVersion, did string) ([]didplc.LogEntry, error) 61 LastOperation(ctx context.Context, atHeight TreeVersion, did string) (didplc.OpEnum, error) 62 Data(ctx context.Context, atHeight TreeVersion, did string) (didplc.RegularOp, error) 63 - Export(ctx context.Context, atHeight TreeVersion, after time.Time, count int) ([]didplc.LogEntry, error) 64 } 65 66 type WritePLC interface {
··· 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/did-method-plc/go-didplc" 10 + "tangled.org/gbl08ma/didplcbft/types" 11 ) 12 13 var ErrDIDNotFound = errors.New("DID not found") ··· 61 AuditLog(ctx context.Context, atHeight TreeVersion, did string) ([]didplc.LogEntry, error) 62 LastOperation(ctx context.Context, atHeight TreeVersion, did string) (didplc.OpEnum, error) 63 Data(ctx context.Context, atHeight TreeVersion, did string) (didplc.RegularOp, error) 64 + Export(ctx context.Context, atHeight TreeVersion, after uint64, count int) ([]types.SequencedLogEntry, error) 65 } 66 67 type WritePLC interface {
+132 -36
plc/plc_test.go
··· 16 "github.com/samber/lo" 17 "github.com/stretchr/testify/require" 18 "tangled.org/gbl08ma/didplcbft/plc" 19 ) 20 21 func TestPLC(t *testing.T) { ··· 190 doc, err = testPLC.Resolve(ctx, plc.SpecificTreeVersion(origVersion+4), testDID) 191 require.NoError(t, err) 192 193 - export, err := testPLC.Export(ctx, plc.CommittedTreeVersion, time.Time{}, 1000) 194 require.NoError(t, err) 195 require.Len(t, export, 3) 196 197 require.Equal(t, "bafyreifgafcel2okxszhgbugieyvtmfig2gtf3dgqoh5fvdh3nlh6ncv6q", export[0].Operation.AsOperation().CID().String()) 198 - require.Equal(t, "bafyreifgafcel2okxszhgbugieyvtmfig2gtf3dgqoh5fvdh3nlh6ncv6q", export[0].CID) 199 require.Equal(t, "bafyreia6ewwkwjgly6dijfepaq2ey6zximodbtqqi5f6fyugli3cxohn5m", export[1].Operation.AsOperation().CID().String()) 200 - require.Equal(t, "bafyreia6ewwkwjgly6dijfepaq2ey6zximodbtqqi5f6fyugli3cxohn5m", export[1].CID) 201 require.Equal(t, "bafyreigyzl2esgnk7nvav5myvgywbshdmatzthc73iiar7tyeq3xjt47m4", export[2].Operation.AsOperation().CID().String()) 202 - require.Equal(t, "bafyreigyzl2esgnk7nvav5myvgywbshdmatzthc73iiar7tyeq3xjt47m4", export[2].CID) 203 204 - // the after parameter is exclusive, we should just get the second successful operation 205 - export, err = testPLC.Export(ctx, plc.CommittedTreeVersion, operations[1].ApplyAt.Time(), 1) 206 require.NoError(t, err) 207 require.Len(t, export, 1) 208 - require.Equal(t, "bafyreia6ewwkwjgly6dijfepaq2ey6zximodbtqqi5f6fyugli3cxohn5m", export[0].CID) 209 } 210 211 func TestPLCFromRemoteOperations(t *testing.T) { ··· 291 } 292 } 293 294 - export, err := testPLC.Export(ctx, plc.CommittedTreeVersion, time.Time{}, 0) 295 require.NoError(t, err) 296 - require.Len(t, export, 96) 297 298 // ensure entries are sorted correctly 299 - last := time.Time{} 300 for _, entry := range export { 301 - et, err := syntax.ParseDatetime(entry.CreatedAt) 302 - require.NoError(t, err) 303 - require.True(t, et.Time().After(last)) 304 - last = et.Time() 305 } 306 } 307 ··· 412 require.NoError(t, err) 413 414 seenCIDs := map[string]struct{}{} 415 - for entry := range iterateOverExport(ctx, "") { 416 err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 417 e, err := client.AuditLog(ctx, entry.DID) 418 return e, stacktrace.Propagate(err, "") ··· 420 require.NoError(t, err) 421 422 seenCIDs[entry.CID] = struct{}{} 423 - if len(seenCIDs) == 4000 { 424 break 425 } 426 } ··· 428 _, _, err = tree.SaveVersion() 429 require.NoError(t, err) 430 431 - exportedEntries, err := testPLC.Export(ctx, plc.CommittedTreeVersion, time.Time{}, len(seenCIDs)+1) 432 require.NoError(t, err) 433 434 require.Len(t, exportedEntries, len(seenCIDs)) 435 436 for _, exportedEntry := range exportedEntries { 437 - delete(seenCIDs, exportedEntry.CID) 438 } 439 require.Empty(t, seenCIDs) 440 } 441 442 - func iterateOverExport(ctx context.Context, startAt string) iter.Seq[didplc.LogEntry] { 443 return func(yield func(didplc.LogEntry) bool) { 444 const batchSize = 1000 445 baseURL := didplc.DefaultDirectoryURL + "/export" 446 client := &http.Client{Timeout: 30 * time.Second} 447 - 448 - // The /export seems to sometimes return outright duplicated entries :weary: 449 - seenCIDs := map[string]struct{}{} 450 451 after := startAt 452 for { ··· 459 460 q := req.URL.Query() 461 q.Add("count", fmt.Sprint(batchSize)) 462 - if after != "" { 463 - q.Add("after", after) 464 - } 465 req.URL.RawQuery = q.Encode() 466 467 resp, err := client.Do(req) ··· 474 return // Non-200 status code 475 } 476 477 - entries := make([]didplc.LogEntry, 0, batchSize) 478 479 // Read response body 480 s := bufio.NewScanner(resp.Body) 481 receivedEntries := 0 482 for s.Scan() { 483 - var entry didplc.LogEntry 484 if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 485 return // Failed to decode JSON 486 } 487 - if _, present := seenCIDs[entry.CID]; !present { 488 - entries = append(entries, entry) 489 - seenCIDs[entry.CID] = struct{}{} 490 - } 491 receivedEntries++ 492 } 493 if s.Err() != nil { ··· 499 } 500 501 // Process each entry 502 - var lastCreatedAt string 503 for _, entry := range entries { 504 - lastCreatedAt = entry.CreatedAt 505 - if !yield(entry) { 506 return 507 } 508 } ··· 510 if receivedEntries < batchSize { 511 return 512 } 513 - 514 - after = lastCreatedAt 515 } 516 } 517 }
··· 16 "github.com/samber/lo" 17 "github.com/stretchr/testify/require" 18 "tangled.org/gbl08ma/didplcbft/plc" 19 + "tangled.org/gbl08ma/didplcbft/types" 20 ) 21 22 func TestPLC(t *testing.T) { ··· 191 doc, err = testPLC.Resolve(ctx, plc.SpecificTreeVersion(origVersion+4), testDID) 192 require.NoError(t, err) 193 194 + export, err := testPLC.Export(ctx, plc.CommittedTreeVersion, 0, 1000) 195 require.NoError(t, err) 196 require.Len(t, export, 3) 197 198 require.Equal(t, "bafyreifgafcel2okxszhgbugieyvtmfig2gtf3dgqoh5fvdh3nlh6ncv6q", export[0].Operation.AsOperation().CID().String()) 199 + require.Equal(t, "bafyreifgafcel2okxszhgbugieyvtmfig2gtf3dgqoh5fvdh3nlh6ncv6q", export[0].CID.String()) 200 require.Equal(t, "bafyreia6ewwkwjgly6dijfepaq2ey6zximodbtqqi5f6fyugli3cxohn5m", export[1].Operation.AsOperation().CID().String()) 201 + require.Equal(t, "bafyreia6ewwkwjgly6dijfepaq2ey6zximodbtqqi5f6fyugli3cxohn5m", export[1].CID.String()) 202 require.Equal(t, "bafyreigyzl2esgnk7nvav5myvgywbshdmatzthc73iiar7tyeq3xjt47m4", export[2].Operation.AsOperation().CID().String()) 203 + require.Equal(t, "bafyreigyzl2esgnk7nvav5myvgywbshdmatzthc73iiar7tyeq3xjt47m4", export[2].CID.String()) 204 205 + // the after parameter is exclusive, with a limit of 1, we should just get the second successful operation 206 + export, err = testPLC.Export(ctx, plc.CommittedTreeVersion, export[0].Seq, 1) 207 require.NoError(t, err) 208 require.Len(t, export, 1) 209 + require.Equal(t, "bafyreia6ewwkwjgly6dijfepaq2ey6zximodbtqqi5f6fyugli3cxohn5m", export[0].CID.String()) 210 } 211 212 func TestPLCFromRemoteOperations(t *testing.T) { ··· 292 } 293 } 294 295 + export, err := testPLC.Export(ctx, plc.CommittedTreeVersion, 0, 0) 296 require.NoError(t, err) 297 + require.Len(t, export, 100) 298 299 // ensure entries are sorted correctly 300 + last := uint64(0) 301 for _, entry := range export { 302 + require.True(t, entry.Seq > last) 303 + last = entry.Seq 304 } 305 } 306 ··· 411 require.NoError(t, err) 412 413 seenCIDs := map[string]struct{}{} 414 + for entry := range iterateOverExport(ctx, 0) { 415 err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 416 e, err := client.AuditLog(ctx, entry.DID) 417 return e, stacktrace.Propagate(err, "") ··· 419 require.NoError(t, err) 420 421 seenCIDs[entry.CID] = struct{}{} 422 + if len(seenCIDs) == 10000 { 423 break 424 } 425 } ··· 427 _, _, err = tree.SaveVersion() 428 require.NoError(t, err) 429 430 + exportedEntries, err := testPLC.Export(ctx, plc.CommittedTreeVersion, 0, len(seenCIDs)+1) 431 require.NoError(t, err) 432 433 require.Len(t, exportedEntries, len(seenCIDs)) 434 435 for _, exportedEntry := range exportedEntries { 436 + delete(seenCIDs, exportedEntry.CID.String()) 437 } 438 require.Empty(t, seenCIDs) 439 } 440 441 + func TestImportOperationWithNullification(t *testing.T) { 442 + var client didplc.Client 443 + 444 + ctx := t.Context() 445 + 446 + testFn := func(toImport []didplc.LogEntry, mutate func(didplc.LogEntry) didplc.LogEntry) ([]types.SequencedLogEntry, []didplc.LogEntry) { 447 + treeProvider := NewTestTreeProvider() 448 + testPLC := plc.NewPLC(treeProvider) 449 + 450 + tree, err := treeProvider.MutableTree() 451 + require.NoError(t, err) 452 + _, _, err = tree.SaveVersion() 453 + require.NoError(t, err) 454 + 455 + for _, entry := range toImport { 456 + entry = mutate(entry) 457 + err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 458 + e, err := client.AuditLog(ctx, entry.DID) 459 + return e, stacktrace.Propagate(err, "") 460 + }) 461 + require.NoError(t, err) 462 + } 463 + 464 + _, _, err = tree.SaveVersion() 465 + require.NoError(t, err) 466 + 467 + exportedEntries, err := testPLC.Export(ctx, plc.CommittedTreeVersion, 0, len(toImport)+1) 468 + require.NoError(t, err) 469 + 470 + require.Len(t, exportedEntries, len(toImport)) 471 + 472 + auditLog, err := testPLC.AuditLog(ctx, plc.CommittedTreeVersion, "did:plc:pkmfz5soq2swsvbhvjekb36g") 473 + require.NoError(t, err) 474 + 475 + return exportedEntries, auditLog 476 + } 477 + 478 + toImport, err := client.AuditLog(ctx, "did:plc:pkmfz5soq2swsvbhvjekb36g") 479 + require.NoError(t, err) 480 + 481 + exportedEntries, auditLog := testFn(toImport, func(le didplc.LogEntry) didplc.LogEntry { return le }) 482 + require.Len(t, auditLog, len(toImport)) 483 + 484 + for i, entry := range exportedEntries { 485 + require.Equal(t, uint64(i+1), entry.Seq) 486 + require.Equal(t, toImport[i].CID, entry.CID.String()) 487 + require.Equal(t, toImport[i].CID, auditLog[i].CID) 488 + require.Equal(t, toImport[i].CreatedAt, entry.CreatedAt.Format(types.ActualAtprotoDatetimeLayout)) 489 + require.Equal(t, toImport[i].CreatedAt, auditLog[i].CreatedAt) 490 + require.Equal(t, toImport[i].Nullified, entry.Nullified) 491 + require.Equal(t, toImport[i].Nullified, auditLog[i].Nullified) 492 + } 493 + 494 + // ensure auditLog never returns nullified entries as the last entries 495 + exportedEntries, auditLog = testFn(toImport[0:5], func(le didplc.LogEntry) didplc.LogEntry { return le }) 496 + 497 + require.Len(t, exportedEntries, 5) 498 + require.Len(t, auditLog, 1) 499 + require.False(t, auditLog[0].Nullified) 500 + require.Equal(t, auditLog[0].CID, "bafyreid2tbopmtuguvuvij5kjcqo7rv7yvqza37uvfcvk5zdxyo57xlfdi") 501 + 502 + // now pretend that at the time of import, no operations were nullified 503 + exportedEntries, auditLog = testFn(toImport, func(le didplc.LogEntry) didplc.LogEntry { 504 + le.Nullified = false 505 + return le 506 + }) 507 + require.Len(t, auditLog, len(toImport)) 508 + 509 + for i, entry := range exportedEntries { 510 + if i < 1 { 511 + require.Equal(t, uint64(i+1), entry.Seq) 512 + } else { 513 + require.Equal(t, uint64(i+5), entry.Seq) 514 + } 515 + require.Equal(t, toImport[i].CID, entry.CID.String()) 516 + require.Equal(t, toImport[i].CID, auditLog[i].CID) 517 + require.Equal(t, toImport[i].CreatedAt, entry.CreatedAt.Format(types.ActualAtprotoDatetimeLayout)) 518 + require.Equal(t, toImport[i].CreatedAt, auditLog[i].CreatedAt) 519 + require.Equal(t, toImport[i].Nullified, entry.Nullified) 520 + require.Equal(t, toImport[i].Nullified, auditLog[i].Nullified) 521 + } 522 + 523 + // now manipulate the timestamp on the first operation just to see the first operation get rewritten 524 + exportedEntries, auditLog = testFn(toImport, func(le didplc.LogEntry) didplc.LogEntry { 525 + if le.CID == "bafyreid2tbopmtuguvuvij5kjcqo7rv7yvqza37uvfcvk5zdxyo57xlfdi" { 526 + // this should cause mustFullyReplaceHistory to become true 527 + le.CreatedAt = syntax.DatetimeNow().String() 528 + } 529 + return le 530 + }) 531 + require.Len(t, auditLog, len(toImport)) 532 + 533 + for i, entry := range exportedEntries { 534 + require.Equal(t, uint64(i+2), entry.Seq) 535 + require.Equal(t, toImport[i].CID, entry.CID.String()) 536 + require.Equal(t, toImport[i].CID, auditLog[i].CID) 537 + require.Equal(t, toImport[i].CreatedAt, entry.CreatedAt.Format(types.ActualAtprotoDatetimeLayout)) 538 + require.Equal(t, toImport[i].CreatedAt, auditLog[i].CreatedAt) 539 + require.Equal(t, toImport[i].Nullified, entry.Nullified) 540 + require.Equal(t, toImport[i].Nullified, auditLog[i].Nullified) 541 + } 542 + } 543 + 544 + func iterateOverExport(ctx context.Context, startAt uint64) iter.Seq[didplc.LogEntry] { 545 return func(yield func(didplc.LogEntry) bool) { 546 const batchSize = 1000 547 baseURL := didplc.DefaultDirectoryURL + "/export" 548 client := &http.Client{Timeout: 30 * time.Second} 549 550 after := startAt 551 for { ··· 558 559 q := req.URL.Query() 560 q.Add("count", fmt.Sprint(batchSize)) 561 + q.Add("after", fmt.Sprint(after)) 562 req.URL.RawQuery = q.Encode() 563 564 resp, err := client.Do(req) ··· 571 return // Non-200 status code 572 } 573 574 + type logEntryWithSeq struct { 575 + didplc.LogEntry 576 + Seq uint64 `json:"seq"` 577 + } 578 + 579 + entries := make([]logEntryWithSeq, 0, batchSize) 580 581 // Read response body 582 s := bufio.NewScanner(resp.Body) 583 receivedEntries := 0 584 for s.Scan() { 585 + var entry logEntryWithSeq 586 if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 587 return // Failed to decode JSON 588 } 589 + entries = append(entries, entry) 590 receivedEntries++ 591 } 592 if s.Err() != nil { ··· 598 } 599 600 // Process each entry 601 for _, entry := range entries { 602 + after = entry.Seq 603 + if !yield(entry.LogEntry) { 604 return 605 } 606 } ··· 608 if receivedEntries < batchSize { 609 return 610 } 611 } 612 } 613 }
+158 -108
store/tree.go
··· 4 "encoding/base32" 5 "encoding/binary" 6 "iter" 7 "slices" 8 "strings" 9 "time" ··· 17 "github.com/polydawn/refmt/obj/atlas" 18 "github.com/samber/lo" 19 "github.com/samber/mo" 20 ) 21 22 - // ActualAtprotoDatetimeLayout is the format for CreatedAt timestamps 23 - // AtprotoDatetimeLayout as defined by github.com/bluesky-social/indigo/atproto/syntax omits trailing zeros in the milliseconds 24 - // This doesn't match how the official plc.directory implementation formats them, so we define that format here with trailing zeros included 25 - const ActualAtprotoDatetimeLayout = "2006-01-02T15:04:05.000Z" 26 - 27 var Tree PLCTreeStore = &TreeStore{} 28 29 type PLCTreeStore interface { 30 - AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]didplc.LogEntry, *ics23.CommitmentProof, error) 31 - AuditLogReverseIterator(tree ReadOnlyTree, did string, err *error) iter.Seq2[int, didplc.LogEntry] 32 - ExportOperations(tree ReadOnlyTree, after time.Time, count int) ([]didplc.LogEntry, error) // passing a count of zero means unlimited 33 StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 34 ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error 35 } ··· 39 // TreeStore exists just to groups methods nicely 40 type TreeStore struct{} 41 42 - func (t *TreeStore) AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]didplc.LogEntry, *ics23.CommitmentProof, error) { 43 proofs := []*ics23.CommitmentProof{} 44 45 didBytes, err := didToBytes(did) ··· 61 return nil, nil, stacktrace.Propagate(err, "") 62 } 63 operationKeys = make([][]byte, 0, len(logOperations)/8) 64 - for ts := range slices.Chunk(logOperations, 8) { 65 - operationKeys = append(operationKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 66 } 67 } 68 ··· 74 proofs = append(proofs, proof) 75 } 76 77 - logEntries := make([]didplc.LogEntry, 0, len(operationKeys)) 78 for _, opKey := range operationKeys { 79 operationValue, err := tree.Get(opKey) 80 if err != nil { ··· 89 proofs = append(proofs, proof) 90 } 91 92 - nullified, operation, err := unmarshalOperationValue(operationValue) 93 if err != nil { 94 return nil, nil, stacktrace.Propagate(err, "") 95 } 96 97 - timestamp, actualDID, err := unmarshalOperationKey(opKey) 98 - if err != nil { 99 - return nil, nil, stacktrace.Propagate(err, "") 100 - } 101 - 102 - logEntries = append(logEntries, didplc.LogEntry{ 103 - DID: actualDID, 104 - Operation: operation, 105 - CID: operation.AsOperation().CID().String(), 106 - Nullified: nullified, 107 - CreatedAt: timestamp.Format(ActualAtprotoDatetimeLayout), 108 - }) 109 } 110 111 var combinedProof *ics23.CommitmentProof ··· 118 return logEntries, combinedProof, nil 119 } 120 121 - func (t *TreeStore) AuditLogReverseIterator(tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, didplc.LogEntry] { 122 - return func(yield func(int, didplc.LogEntry) bool) { 123 didBytes, err := didToBytes(did) 124 if err != nil { 125 *retErr = stacktrace.Propagate(err, "") ··· 142 return 143 } 144 operationKeys = make([][]byte, 0, len(logOperations)/8) 145 - for ts := range slices.Chunk(logOperations, 8) { 146 - operationKeys = append(operationKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 147 } 148 } 149 ··· 155 return 156 } 157 158 - nullified, operation, err := unmarshalOperationValue(operationValue) 159 if err != nil { 160 *retErr = stacktrace.Propagate(err, "") 161 return 162 } 163 164 - timestamp, actualDID, err := unmarshalOperationKey(opKey) 165 - if err != nil { 166 - *retErr = stacktrace.Propagate(err, "") 167 - return 168 - } 169 - 170 - if !yield(i, didplc.LogEntry{ 171 - DID: actualDID, 172 - Operation: operation, 173 - CID: operation.AsOperation().CID().String(), 174 - Nullified: nullified, 175 - CreatedAt: timestamp.Format(ActualAtprotoDatetimeLayout), 176 - }) { 177 return 178 } 179 } 180 } 181 } 182 183 - func (t *TreeStore) ExportOperations(tree ReadOnlyTree, after time.Time, count int) ([]didplc.LogEntry, error) { 184 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 185 - start := after.Add(1 * time.Nanosecond) 186 - startKey := marshalOperationKey(start, make([]byte, 15)) 187 - if after.UnixNano() < 0 { 188 - // our storage format doesn't deal well with negative unix timestamps, 189 - // but that's fine because we don't have operations created that far back. assume we just want to iterate from the start 190 - copy(startKey[1:8], make([]byte, 8)) 191 - } 192 193 - entries := make([]didplc.LogEntry, 0, count) 194 var iterErr error 195 - tree.IterateRange(startKey, nil, true, func(operationKey, operationValue []byte) bool { 196 - nullified, operation, err := unmarshalOperationValue(operationValue) 197 if err != nil { 198 iterErr = stacktrace.Propagate(err, "") 199 return true 200 } 201 202 - timestamp, actualDID, err := unmarshalOperationKey(operationKey) 203 - if err != nil { 204 - iterErr = stacktrace.Propagate(err, "") 205 - return true 206 - } 207 - 208 - entries = append(entries, didplc.LogEntry{ 209 - DID: actualDID, 210 - Operation: operation, 211 - CID: operation.AsOperation().CID().String(), 212 - Nullified: nullified, 213 - CreatedAt: timestamp.Format(ActualAtprotoDatetimeLayout), 214 - }) 215 return len(entries) == count // this condition being checked here also makes it so that a count of zero means unlimited 216 }) 217 if iterErr != nil { ··· 237 operationKeys = [][]byte{} 238 } else { 239 operationKeys = make([][]byte, 0, len(logOperations)/8) 240 - for ts := range slices.Chunk(logOperations, 8) { 241 - operationKeys = append(operationKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 242 } 243 } 244 ··· 262 return stacktrace.Propagate(err, "invalid CreatedAt") 263 } 264 265 operation := entry.Operation.AsOperation() 266 - opKey := marshalOperationKey(opDatetime.Time(), didBytes) 267 - opValue := marshalOperationValue(entry.Nullified, operation) 268 269 _, err = tree.Set(opKey, opValue) 270 if err != nil { ··· 280 return nil 281 } 282 283 - func (t *TreeStore) ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error { 284 - if len(history) == 0 { 285 // for now this isn't needed, if it's needed in the future we'll have to accept a DID as argument on this function 286 return stacktrace.NewError("can't replace with empty history") 287 } 288 289 - did := history[0].DID 290 291 didBytes, err := didToBytes(did) 292 if err != nil { ··· 295 296 logKey := marshalDIDLogKey(didBytes) 297 298 - // identify keys of existing operations for this DID (if any) 299 - var prevOpKeys [][]byte 300 - logOperations, err := tree.Get(logKey) 301 if err != nil { 302 return stacktrace.Propagate(err, "") 303 } 304 - prevOpKeys = make([][]byte, 0, len(logOperations)/8) 305 - for ts := range slices.Chunk(logOperations, 8) { 306 - prevOpKeys = append(prevOpKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 307 } 308 309 - // remove existing operations for this DID (if any) 310 - for _, key := range prevOpKeys { 311 _, _, err = tree.Remove(key) 312 if err != nil { 313 return stacktrace.Propagate(err, "") 314 } 315 } 316 317 - // add new list of operations 318 - logOperations = make([]byte, 0, len(history)*8) 319 - for _, entry := range history { 320 opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) 321 if err != nil { 322 return stacktrace.Propagate(err, "invalid CreatedAt") 323 } 324 325 operation := entry.Operation.AsOperation() 326 - opKey := marshalOperationKey(opDatetime.Time(), didBytes) 327 - opValue := marshalOperationValue(entry.Nullified, operation) 328 329 _, err = tree.Set(opKey, opValue) 330 if err != nil { ··· 344 return nil 345 } 346 347 func didToBytes(did string) ([]byte, error) { 348 if !strings.HasPrefix(did, "did:plc:") { 349 return nil, stacktrace.NewError("invalid did:plc") ··· 379 return key 380 } 381 382 - func timestampBytesToDIDOperationKey(timestamp []byte, didBytes []byte) []byte { 383 - key := make([]byte, 1+8+15) 384 key[0] = 'o' 385 - copy(key[1:9], timestamp) 386 - copy(key[9:], didBytes) 387 return key 388 } 389 390 - func marshalOperationKey(createdAt time.Time, didBytes []byte) []byte { 391 - key := make([]byte, 1+8+15) 392 key[0] = 'o' 393 394 - ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) 395 - binary.BigEndian.PutUint64(key[1:], ts) 396 397 - copy(key[9:], didBytes) 398 return key 399 } 400 401 - func unmarshalOperationKey(key []byte) (time.Time, string, error) { 402 - createdAtUnixNano := binary.BigEndian.Uint64(key[1:9]) 403 - createdAt := time.Unix(0, int64(createdAtUnixNano)).UTC() 404 - did, err := bytesToDID(key[9:]) 405 - return createdAt, did, stacktrace.Propagate(err, "") 406 } 407 408 - func marshalOperationValue(nullified bool, operation didplc.Operation) []byte { 409 - o := []byte{lo.Ternary[byte](nullified, 1, 0)} 410 - o = append(o, operation.SignedCBORBytes()...) 411 return o 412 } 413 414 - func unmarshalOperationValue(value []byte) (bool, didplc.OpEnum, error) { 415 nullified := value[0] != 0 416 var opEnum didplc.OpEnum 417 - err := cbornode.DecodeInto(value[1:], &opEnum) 418 if err != nil { 419 - return false, didplc.OpEnum{}, stacktrace.Propagate(err, "") 420 } 421 - return nullified, opEnum, nil 422 } 423 424 func init() {
··· 4 "encoding/base32" 5 "encoding/binary" 6 "iter" 7 + "math" 8 "slices" 9 "strings" 10 "time" ··· 18 "github.com/polydawn/refmt/obj/atlas" 19 "github.com/samber/lo" 20 "github.com/samber/mo" 21 + "tangled.org/gbl08ma/didplcbft/types" 22 ) 23 24 var Tree PLCTreeStore = &TreeStore{} 25 26 type PLCTreeStore interface { 27 + AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 28 + AuditLogReverseIterator(tree ReadOnlyTree, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 29 + ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 30 StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 31 ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error 32 } ··· 36 // TreeStore exists just to groups methods nicely 37 type TreeStore struct{} 38 39 + func (t *TreeStore) AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 40 proofs := []*ics23.CommitmentProof{} 41 42 didBytes, err := didToBytes(did) ··· 58 return nil, nil, stacktrace.Propagate(err, "") 59 } 60 operationKeys = make([][]byte, 0, len(logOperations)/8) 61 + for seqBytes := range slices.Chunk(logOperations, 8) { 62 + operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 63 } 64 } 65 ··· 71 proofs = append(proofs, proof) 72 } 73 74 + logEntries := make([]types.SequencedLogEntry, 0, len(operationKeys)) 75 for _, opKey := range operationKeys { 76 operationValue, err := tree.Get(opKey) 77 if err != nil { ··· 86 proofs = append(proofs, proof) 87 } 88 89 + logEntry, err := unmarshalLogEntry(opKey, operationValue) 90 if err != nil { 91 return nil, nil, stacktrace.Propagate(err, "") 92 } 93 94 + logEntries = append(logEntries, logEntry) 95 } 96 97 var combinedProof *ics23.CommitmentProof ··· 104 return logEntries, combinedProof, nil 105 } 106 107 + func (t *TreeStore) AuditLogReverseIterator(tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, types.SequencedLogEntry] { 108 + return func(yield func(int, types.SequencedLogEntry) bool) { 109 didBytes, err := didToBytes(did) 110 if err != nil { 111 *retErr = stacktrace.Propagate(err, "") ··· 128 return 129 } 130 operationKeys = make([][]byte, 0, len(logOperations)/8) 131 + for seqBytes := range slices.Chunk(logOperations, 8) { 132 + operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 133 } 134 } 135 ··· 141 return 142 } 143 144 + logEntry, err := unmarshalLogEntry(opKey, operationValue) 145 if err != nil { 146 *retErr = stacktrace.Propagate(err, "") 147 return 148 } 149 150 + if !yield(i, logEntry) { 151 return 152 } 153 } 154 } 155 } 156 157 + func (t *TreeStore) ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) { 158 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 159 + start := after + 1 160 + startKey := marshalOperationKey(start) 161 + endKey := maxOperationKey 162 163 + entries := make([]types.SequencedLogEntry, 0, count) 164 var iterErr error 165 + tree.IterateRange(startKey, endKey, true, func(operationKey, operationValue []byte) bool { 166 + logEntry, err := unmarshalLogEntry(operationKey, operationValue) 167 if err != nil { 168 iterErr = stacktrace.Propagate(err, "") 169 return true 170 } 171 172 + entries = append(entries, logEntry) 173 return len(entries) == count // this condition being checked here also makes it so that a count of zero means unlimited 174 }) 175 if iterErr != nil { ··· 195 operationKeys = [][]byte{} 196 } else { 197 operationKeys = make([][]byte, 0, len(logOperations)/8) 198 + for seqBytes := range slices.Chunk(logOperations, 8) { 199 + operationKeys = append(operationKeys, sequenceBytesToOperationKey(seqBytes)) 200 } 201 } 202 ··· 220 return stacktrace.Propagate(err, "invalid CreatedAt") 221 } 222 223 + seq, err := getNextSeqID(tree) 224 + if err != nil { 225 + return stacktrace.Propagate(err, "") 226 + } 227 + 228 operation := entry.Operation.AsOperation() 229 + opKey := marshalOperationKey(seq) 230 + opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) 231 232 _, err = tree.Set(opKey, opValue) 233 if err != nil { ··· 243 return nil 244 } 245 246 + func (t *TreeStore) ReplaceHistory(tree *iavl.MutableTree, remoteHistory []didplc.LogEntry) error { 247 + if len(remoteHistory) == 0 { 248 // for now this isn't needed, if it's needed in the future we'll have to accept a DID as argument on this function 249 return stacktrace.NewError("can't replace with empty history") 250 } 251 252 + did := remoteHistory[0].DID 253 254 didBytes, err := didToBytes(did) 255 if err != nil { ··· 258 259 logKey := marshalDIDLogKey(didBytes) 260 261 + localHistory, _, err := t.AuditLog(tree, did, false) 262 if err != nil { 263 return stacktrace.Propagate(err, "") 264 } 265 + 266 + // if the first operations are equal to what we already have, keep them untouched to minimize the turmoil 267 + keepLocalBeforeIdx := 0 268 + for i, localEntry := range localHistory { 269 + if i >= len(remoteHistory) { 270 + break 271 + } 272 + remoteEntry := remoteHistory[i] 273 + 274 + // stop looping once we find a difference 275 + // we trust that the authoritative source computes CIDs properly (i.e. that two operations having the same CID are indeed equal) 276 + if localEntry.Nullified != remoteEntry.Nullified || localEntry.CID.String() != remoteEntry.CID { 277 + break 278 + } 279 + 280 + remoteDatetime, err := syntax.ParseDatetime(remoteEntry.CreatedAt) 281 + if err != nil { 282 + return stacktrace.Propagate(err, "invalid CreatedAt") 283 + } 284 + 285 + if !localEntry.CreatedAt.Equal(remoteDatetime.Time()) { 286 + break 287 + } 288 + 289 + keepLocalBeforeIdx++ 290 } 291 292 + // all replaced/added operations get new sequence IDs. 293 + // Get the highest sequence ID before removing any keys to ensure the sequence IDs actually change 294 + seq, err := getNextSeqID(tree) 295 + if err != nil { 296 + return stacktrace.Propagate(err, "") 297 + } 298 + 299 + // remove existing conflicting operations for this DID (if any) 300 + logOperations, err := tree.Get(logKey) 301 + if err != nil { 302 + return stacktrace.Propagate(err, "") 303 + } 304 + logOperationsToDelete := logOperations[8*keepLocalBeforeIdx:] 305 + for seqBytes := range slices.Chunk(logOperationsToDelete, 8) { 306 + key := sequenceBytesToOperationKey(seqBytes) 307 + 308 _, _, err = tree.Remove(key) 309 if err != nil { 310 return stacktrace.Propagate(err, "") 311 } 312 } 313 314 + // add just the operations past the point they weren't kept 315 + remoteHistory = remoteHistory[keepLocalBeforeIdx:] 316 + 317 + // keep the operations log up until the point we've kept the history 318 + // clone just to make sure we avoid issues since we got this slice from the tree, it is not meant to be modified 319 + logOperations = slices.Clone(logOperations[0 : 8*keepLocalBeforeIdx]) 320 + 321 + for _, entry := range remoteHistory { 322 opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) 323 if err != nil { 324 return stacktrace.Propagate(err, "invalid CreatedAt") 325 } 326 327 operation := entry.Operation.AsOperation() 328 + opKey := marshalOperationKey(seq) 329 + seq++ 330 + opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) 331 332 _, err = tree.Set(opKey, opValue) 333 if err != nil { ··· 347 return nil 348 } 349 350 + var minOperationKey = marshalOperationKey(0) 351 + var maxOperationKey = marshalOperationKey(math.MaxInt64) 352 + 353 + func getNextSeqID(tree *iavl.MutableTree) (uint64, error) { 354 + seq := uint64(0) 355 + var err error 356 + tree.IterateRange(minOperationKey, maxOperationKey, false, func(key, value []byte) bool { 357 + seq, err = unmarshalOperationKey(key) 358 + return true 359 + }) 360 + 361 + return seq + 1, stacktrace.Propagate(err, "") 362 + } 363 + 364 func didToBytes(did string) ([]byte, error) { 365 if !strings.HasPrefix(did, "did:plc:") { 366 return nil, stacktrace.NewError("invalid did:plc") ··· 396 return key 397 } 398 399 + func sequenceBytesToOperationKey(sequenceBytes []byte) []byte { 400 + key := make([]byte, 1+8) 401 key[0] = 'o' 402 + copy(key[1:9], sequenceBytes) 403 return key 404 } 405 406 + func marshalOperationKey(sequence uint64) []byte { 407 + key := make([]byte, 1+8) 408 key[0] = 'o' 409 410 + binary.BigEndian.PutUint64(key[1:], sequence) 411 412 return key 413 } 414 415 + func unmarshalOperationKey(key []byte) (uint64, error) { 416 + return binary.BigEndian.Uint64(key[1:9]), nil 417 } 418 419 + func marshalOperationValue(nullified bool, didBytes []byte, createdAt time.Time, operation didplc.Operation) []byte { 420 + opAsBytes := operation.SignedCBORBytes() 421 + o := make([]byte, 1+15+8+len(opAsBytes)) 422 + 423 + o[0] = lo.Ternary[byte](nullified, 1, 0) 424 + 425 + copy(o[1:16], didBytes) 426 + 427 + ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) 428 + binary.BigEndian.PutUint64(o[16:24], ts) 429 + copy(o[24:], opAsBytes) 430 + 431 return o 432 } 433 434 + func unmarshalOperationValue(value []byte) (bool, string, time.Time, didplc.OpEnum, error) { 435 nullified := value[0] != 0 436 + 437 + did, err := bytesToDID(value[1:16]) 438 + if err != nil { 439 + return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err, "") 440 + } 441 + 442 + createdAtUnixNano := binary.BigEndian.Uint64(value[16:24]) 443 + createdAt := time.Unix(0, int64(createdAtUnixNano)).UTC() 444 + 445 var opEnum didplc.OpEnum 446 + err = cbornode.DecodeInto(value[24:], &opEnum) 447 + if err != nil { 448 + return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err, "") 449 + } 450 + return nullified, did, createdAt, opEnum, nil 451 + } 452 + 453 + func unmarshalLogEntry(operationKey, operationValue []byte) (types.SequencedLogEntry, error) { 454 + nullified, actualDID, timestamp, operation, err := unmarshalOperationValue(operationValue) 455 + if err != nil { 456 + return types.SequencedLogEntry{}, stacktrace.Propagate(err, "") 457 + } 458 + 459 + seq, err := unmarshalOperationKey(operationKey) 460 if err != nil { 461 + return types.SequencedLogEntry{}, stacktrace.Propagate(err, "") 462 } 463 + 464 + return types.SequencedLogEntry{ 465 + Seq: seq, 466 + DID: actualDID, 467 + Operation: operation, 468 + CID: operation.AsOperation().CID(), 469 + Nullified: nullified, 470 + CreatedAt: timestamp, 471 + }, nil 472 } 473 474 func init() {
+32
types/log_entry.go
···
··· 1 + package types 2 + 3 + import ( 4 + "time" 5 + 6 + "github.com/did-method-plc/go-didplc" 7 + "github.com/ipfs/go-cid" 8 + ) 9 + 10 + type SequencedLogEntry struct { 11 + Seq uint64 12 + DID string 13 + Operation didplc.OpEnum 14 + CID cid.Cid 15 + Nullified bool 16 + CreatedAt time.Time 17 + } 18 + 19 + func (l SequencedLogEntry) ToDIDPLCLogEntry() didplc.LogEntry { 20 + return didplc.LogEntry{ 21 + DID: l.DID, 22 + Operation: l.Operation, 23 + CID: l.CID.String(), 24 + Nullified: l.Nullified, 25 + CreatedAt: l.CreatedAt.Format(ActualAtprotoDatetimeLayout), 26 + } 27 + } 28 + 29 + // ActualAtprotoDatetimeLayout is the format for CreatedAt timestamps 30 + // AtprotoDatetimeLayout as defined by github.com/bluesky-social/indigo/atproto/syntax omits trailing zeros in the milliseconds 31 + // This doesn't match how the official plc.directory implementation formats them, so we define that format here with trailing zeros included 32 + const ActualAtprotoDatetimeLayout = "2006-01-02T15:04:05.000Z"