A very experimental PLC implementation which uses BFT consensus for decentralization
at main 7.7 kB view raw
1package abciapp 2 3import ( 4 "bufio" 5 "context" 6 "crypto/sha256" 7 "encoding/binary" 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "net/http" 12 "net/url" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/did-method-plc/go-didplc" 18 "github.com/ipfs/go-cid" 19 cbornode "github.com/ipfs/go-ipld-cbor" 20 "github.com/palantir/stacktrace" 21 "tangled.org/gbl08ma.com/didplcbft/store" 22) 23 24const EagerFetchMaxOps = 10000 25const OpsPerImportTx = 1000 26const OpsPerEagerFetch = 1000 27 28type authoritativeOperationsCache struct { 29 mu sync.Mutex 30 31 plcURL string 32 operations map[uint64]logEntryWithSeq 33 highestFetchedHeight uint64 34} 35 36type logEntryWithSeq struct { 37 didplc.LogEntry 38 Seq uint64 `json:"seq"` 39} 40 41func newAuthoritativeOperationsCache(ctx context.Context, plc string) *authoritativeOperationsCache { 42 aoc := &authoritativeOperationsCache{ 43 plcURL: plc, 44 operations: make(map[uint64]logEntryWithSeq), 45 } 46 47 go func() { 48 ticker := time.NewTicker(500 * time.Millisecond) 49 for { 50 select { 51 case <-ctx.Done(): 52 return 53 case <-ticker.C: 54 aoc.eagerlyFetch(ctx) 55 } 56 } 57 }() 58 59 return aoc 60} 61 62func getOrCreateAuthoritativeOperationsCache(ctx context.Context, aocsByPLC map[string]*authoritativeOperationsCache, plc string) *authoritativeOperationsCache { 63 aoc, ok := aocsByPLC[plc] 64 if !ok { 65 aoc = newAuthoritativeOperationsCache(ctx, plc) 66 aocsByPLC[plc] = aoc 67 } 68 return aoc 69} 70 71func (a *authoritativeOperationsCache) eagerlyFetch(ctx context.Context) { 72 a.mu.Lock() 73 defer a.mu.Unlock() 74 75 curOps := len(a.operations) 76 if curOps >= EagerFetchMaxOps { 77 return 78 } 79 _, _ = a.fetchInMutex(ctx, a.highestFetchedHeight, OpsPerEagerFetch) 80} 81 82func (a *authoritativeOperationsCache) dropSeqBelowOrEqual(highestCommittedSeq uint64) { 83 a.mu.Lock() 84 defer a.mu.Unlock() 85 86 for i := range a.operations { 87 if a.operations[i].Seq <= highestCommittedSeq { 88 delete(a.operations, i) 89 } 90 } 91} 92 93func (a *authoritativeOperationsCache) fetchInMutex(ctx context.Context, after, count uint64) (bool, error) { 94 entries, _, err := fetchExportedBatchFromAuthoritativeSource(ctx, a.plcURL, after, count) 95 if err != nil { 96 return false, stacktrace.Propagate(err, "") 97 } 98 99 for _, entry := range entries { 100 a.operations[entry.Seq] = entry 101 a.highestFetchedHeight = max(a.highestFetchedHeight, entry.Seq) 102 } 103 return uint64(len(entries)) < count, nil 104} 105 106func (a *authoritativeOperationsCache) get(ctx context.Context, after, count uint64) ([]logEntryWithSeq, error) { 107 a.mu.Lock() 108 defer a.mu.Unlock() 109 110 result := make([]logEntryWithSeq, 0, count) 111 reachedEnd := false 112 for i := uint64(0); uint64(len(result)) < count; i++ { 113 opSeq := after + i + 1 114 op, ok := a.operations[opSeq] 115 if !ok { 116 if reachedEnd { 117 // it's because we are asking about ops that don't exist yet, return 118 break 119 } 120 121 re, err := a.fetchInMutex(ctx, after+i, count) 122 if err != nil { 123 return nil, stacktrace.Propagate(err, "") 124 } 125 126 reachedEnd = reachedEnd || re 127 128 op, ok = a.operations[opSeq] 129 if !ok { 130 // still not present even after fetching 131 // the authoritative source probably skipped this seq? 132 continue 133 } 134 } 135 136 result = append(result, op) 137 } 138 139 return result, nil 140} 141 142func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, plcURL string, startAt, maxCount uint64) ([]logEntryWithSeq, uint64, error) { 143 baseURL, err := url.JoinPath(plcURL, "/export") 144 if err != nil { 145 return nil, 0, stacktrace.Propagate(err, "") 146 } 147 148 client := &http.Client{Timeout: 30 * time.Second} 149 150 entries := make([]logEntryWithSeq, 0, maxCount) 151 for { 152 req, err := http.NewRequestWithContext(ctx, "GET", baseURL, nil) 153 if err != nil { 154 return nil, 0, stacktrace.Propagate(err, "") 155 } 156 157 req.Header.Set("User-Agent", "didplcbft") 158 159 requestCount := min(1000, maxCount-uint64(len(entries))) 160 161 q := req.URL.Query() 162 q.Add("count", fmt.Sprint(requestCount)) 163 q.Add("after", fmt.Sprint(startAt)) 164 req.URL.RawQuery = q.Encode() 165 166 resp, err := client.Do(req) 167 if err != nil { 168 return nil, 0, stacktrace.Propagate(err, "") 169 } 170 defer resp.Body.Close() 171 172 if resp.StatusCode != http.StatusOK { 173 return nil, 0, stacktrace.NewError("non-200 status code") 174 } 175 176 // Read response body 177 s := bufio.NewScanner(resp.Body) 178 numEntriesThisResponse := 0 179 for s.Scan() && len(entries) < int(maxCount) { 180 var entry logEntryWithSeq 181 if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 182 return nil, 0, stacktrace.Propagate(err, "") 183 } 184 entries = append(entries, entry) 185 numEntriesThisResponse++ 186 startAt = entry.Seq 187 } 188 if s.Err() != nil { 189 return nil, 0, stacktrace.Propagate(s.Err(), "") 190 } 191 192 if uint64(numEntriesThisResponse) < requestCount || len(entries) >= int(maxCount) { 193 break 194 } 195 } 196 197 return entries, startAt, nil 198} 199 200func computeLogEntriesHash(logEntries []logEntryWithSeq) ([]byte, error) { 201 // let's _not_ rely on the specifics of the JSON representation 202 // (instead let's rely on specifics of our implementation, heh) 203 204 hash := sha256.New() 205 for i, entry := range logEntries { 206 // Write DID 207 didBytes, err := store.DIDToBytes(entry.DID) 208 if err != nil { 209 return nil, stacktrace.Propagate(err, "invalid DID in entry index %d", i) 210 } 211 212 _, err = hash.Write(didBytes) 213 if err != nil { 214 return nil, stacktrace.Propagate(err, "") 215 } 216 217 // Write CID 218 // (We trust that the authoritative source computed CIDs properly, so we use theirs rather than decoding the operation and recomputing) 219 cid, err := cid.Decode(entry.CID) 220 if err != nil { 221 return nil, stacktrace.Propagate(err, "invalid CID in entry index %d", i) 222 } 223 224 _, err = hash.Write(cid.Bytes()) 225 if err != nil { 226 return nil, stacktrace.Propagate(err, "") 227 } 228 229 // Write CreatedAt 230 createdAt, err := syntax.ParseDatetime(entry.CreatedAt) 231 if err != nil { 232 return nil, stacktrace.Propagate(err, "invalid CreatedAt in entry index %d", i) 233 } 234 235 tsBytes := make([]byte, 8) 236 binary.BigEndian.PutUint64(tsBytes, uint64(createdAt.Time().Truncate(1*time.Millisecond).UTC().UnixNano())) 237 238 _, err = hash.Write(tsBytes) 239 if err != nil { 240 return nil, stacktrace.Propagate(err, "") 241 } 242 243 // Nullified can't be part of the hash as it can change on the authoritative source at any moment, 244 // we always import operations as if they weren't nullified and recompute the nullification status as needed 245 } 246 247 return hash.Sum(nil), nil 248} 249 250func (d *DIDPLCApplication) maybeCreateAuthoritativeImportTx(ctx context.Context) ([]byte, error) { 251 // use WorkingTreeVersion so we take into account any import operation that may have been processed in this block 252 readTx := d.txFactory.ReadWorking(time.Now()) 253 254 plcURL, err := store.Tree.AuthoritativePLC(readTx) 255 if err != nil { 256 return nil, stacktrace.Propagate(err, "") 257 } 258 259 if plcURL == "" { 260 // we're not doing imports 261 return nil, nil 262 } 263 264 cursor, err := store.Tree.AuthoritativeImportProgress(readTx) 265 if err != nil { 266 return nil, stacktrace.Propagate(err, "") 267 } 268 269 aoc := getOrCreateAuthoritativeOperationsCache(d.runnerContext, d.aocsByPLC, plcURL) 270 271 entries, err := aoc.get(ctx, cursor, OpsPerImportTx) 272 if err != nil { 273 return nil, stacktrace.Propagate(err, "") 274 } 275 276 if len(entries) == 0 { 277 // nothing to import at the moment 278 return nil, nil 279 } 280 281 hashBytes, err := computeLogEntriesHash(entries) 282 if err != nil { 283 return nil, stacktrace.Propagate(err, "") 284 } 285 286 tx := Transaction[AuthoritativeImportArguments]{ 287 Action: TransactionActionAuthoritativeImport, 288 Arguments: AuthoritativeImportArguments{ 289 PLCURL: plcURL, 290 Hash: hex.EncodeToString(hashBytes), 291 Cursor: cursor, 292 Count: uint64(len(entries)), 293 }, 294 } 295 296 out, err := cbornode.DumpObject(tx) 297 if err != nil { 298 return nil, stacktrace.Propagate(err, "") 299 } 300 return out, nil 301}