A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "bufio"
5 "context"
6 "crypto/sha256"
7 "encoding/binary"
8 "encoding/hex"
9 "encoding/json"
10 "fmt"
11 "net/http"
12 "net/url"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/did-method-plc/go-didplc"
18 "github.com/ipfs/go-cid"
19 cbornode "github.com/ipfs/go-ipld-cbor"
20 "github.com/palantir/stacktrace"
21 "tangled.org/gbl08ma.com/didplcbft/store"
22)
23
24const EagerFetchMaxOps = 10000
25const OpsPerImportTx = 1000
26const OpsPerEagerFetch = 1000
27
28type authoritativeOperationsCache struct {
29 mu sync.Mutex
30
31 plcURL string
32 operations map[uint64]logEntryWithSeq
33 highestFetchedHeight uint64
34}
35
36type logEntryWithSeq struct {
37 didplc.LogEntry
38 Seq uint64 `json:"seq"`
39}
40
41func newAuthoritativeOperationsCache(ctx context.Context, plc string) *authoritativeOperationsCache {
42 aoc := &authoritativeOperationsCache{
43 plcURL: plc,
44 operations: make(map[uint64]logEntryWithSeq),
45 }
46
47 go func() {
48 ticker := time.NewTicker(500 * time.Millisecond)
49 for {
50 select {
51 case <-ctx.Done():
52 return
53 case <-ticker.C:
54 aoc.eagerlyFetch(ctx)
55 }
56 }
57 }()
58
59 return aoc
60}
61
62func getOrCreateAuthoritativeOperationsCache(ctx context.Context, aocsByPLC map[string]*authoritativeOperationsCache, plc string) *authoritativeOperationsCache {
63 aoc, ok := aocsByPLC[plc]
64 if !ok {
65 aoc = newAuthoritativeOperationsCache(ctx, plc)
66 aocsByPLC[plc] = aoc
67 }
68 return aoc
69}
70
71func (a *authoritativeOperationsCache) eagerlyFetch(ctx context.Context) {
72 a.mu.Lock()
73 defer a.mu.Unlock()
74
75 curOps := len(a.operations)
76 if curOps >= EagerFetchMaxOps {
77 return
78 }
79 _, _ = a.fetchInMutex(ctx, a.highestFetchedHeight, OpsPerEagerFetch)
80}
81
82func (a *authoritativeOperationsCache) dropSeqBelowOrEqual(highestCommittedSeq uint64) {
83 a.mu.Lock()
84 defer a.mu.Unlock()
85
86 for i := range a.operations {
87 if a.operations[i].Seq <= highestCommittedSeq {
88 delete(a.operations, i)
89 }
90 }
91}
92
93func (a *authoritativeOperationsCache) fetchInMutex(ctx context.Context, after, count uint64) (bool, error) {
94 entries, _, err := fetchExportedBatchFromAuthoritativeSource(ctx, a.plcURL, after, count)
95 if err != nil {
96 return false, stacktrace.Propagate(err, "")
97 }
98
99 for _, entry := range entries {
100 a.operations[entry.Seq] = entry
101 a.highestFetchedHeight = max(a.highestFetchedHeight, entry.Seq)
102 }
103 return uint64(len(entries)) < count, nil
104}
105
106func (a *authoritativeOperationsCache) get(ctx context.Context, after, count uint64) ([]logEntryWithSeq, error) {
107 a.mu.Lock()
108 defer a.mu.Unlock()
109
110 result := make([]logEntryWithSeq, 0, count)
111 reachedEnd := false
112 for i := uint64(0); uint64(len(result)) < count; i++ {
113 opSeq := after + i + 1
114 op, ok := a.operations[opSeq]
115 if !ok {
116 if reachedEnd {
117 // it's because we are asking about ops that don't exist yet, return
118 break
119 }
120
121 re, err := a.fetchInMutex(ctx, after+i, count)
122 if err != nil {
123 return nil, stacktrace.Propagate(err, "")
124 }
125
126 reachedEnd = reachedEnd || re
127
128 op, ok = a.operations[opSeq]
129 if !ok {
130 // still not present even after fetching
131 // the authoritative source probably skipped this seq?
132 continue
133 }
134 }
135
136 result = append(result, op)
137 }
138
139 return result, nil
140}
141
142func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, plcURL string, startAt, maxCount uint64) ([]logEntryWithSeq, uint64, error) {
143 baseURL, err := url.JoinPath(plcURL, "/export")
144 if err != nil {
145 return nil, 0, stacktrace.Propagate(err, "")
146 }
147
148 client := &http.Client{Timeout: 30 * time.Second}
149
150 entries := make([]logEntryWithSeq, 0, maxCount)
151 for {
152 req, err := http.NewRequestWithContext(ctx, "GET", baseURL, nil)
153 if err != nil {
154 return nil, 0, stacktrace.Propagate(err, "")
155 }
156
157 req.Header.Set("User-Agent", "didplcbft")
158
159 requestCount := min(1000, maxCount-uint64(len(entries)))
160
161 q := req.URL.Query()
162 q.Add("count", fmt.Sprint(requestCount))
163 q.Add("after", fmt.Sprint(startAt))
164 req.URL.RawQuery = q.Encode()
165
166 resp, err := client.Do(req)
167 if err != nil {
168 return nil, 0, stacktrace.Propagate(err, "")
169 }
170 defer resp.Body.Close()
171
172 if resp.StatusCode != http.StatusOK {
173 return nil, 0, stacktrace.NewError("non-200 status code")
174 }
175
176 // Read response body
177 s := bufio.NewScanner(resp.Body)
178 numEntriesThisResponse := 0
179 for s.Scan() && len(entries) < int(maxCount) {
180 var entry logEntryWithSeq
181 if err := json.Unmarshal(s.Bytes(), &entry); err != nil {
182 return nil, 0, stacktrace.Propagate(err, "")
183 }
184 entries = append(entries, entry)
185 numEntriesThisResponse++
186 startAt = entry.Seq
187 }
188 if s.Err() != nil {
189 return nil, 0, stacktrace.Propagate(s.Err(), "")
190 }
191
192 if uint64(numEntriesThisResponse) < requestCount || len(entries) >= int(maxCount) {
193 break
194 }
195 }
196
197 return entries, startAt, nil
198}
199
200func computeLogEntriesHash(logEntries []logEntryWithSeq) ([]byte, error) {
201 // let's _not_ rely on the specifics of the JSON representation
202 // (instead let's rely on specifics of our implementation, heh)
203
204 hash := sha256.New()
205 for i, entry := range logEntries {
206 // Write DID
207 didBytes, err := store.DIDToBytes(entry.DID)
208 if err != nil {
209 return nil, stacktrace.Propagate(err, "invalid DID in entry index %d", i)
210 }
211
212 _, err = hash.Write(didBytes)
213 if err != nil {
214 return nil, stacktrace.Propagate(err, "")
215 }
216
217 // Write CID
218 // (We trust that the authoritative source computed CIDs properly, so we use theirs rather than decoding the operation and recomputing)
219 cid, err := cid.Decode(entry.CID)
220 if err != nil {
221 return nil, stacktrace.Propagate(err, "invalid CID in entry index %d", i)
222 }
223
224 _, err = hash.Write(cid.Bytes())
225 if err != nil {
226 return nil, stacktrace.Propagate(err, "")
227 }
228
229 // Write CreatedAt
230 createdAt, err := syntax.ParseDatetime(entry.CreatedAt)
231 if err != nil {
232 return nil, stacktrace.Propagate(err, "invalid CreatedAt in entry index %d", i)
233 }
234
235 tsBytes := make([]byte, 8)
236 binary.BigEndian.PutUint64(tsBytes, uint64(createdAt.Time().Truncate(1*time.Millisecond).UTC().UnixNano()))
237
238 _, err = hash.Write(tsBytes)
239 if err != nil {
240 return nil, stacktrace.Propagate(err, "")
241 }
242
243 // Nullified can't be part of the hash as it can change on the authoritative source at any moment,
244 // we always import operations as if they weren't nullified and recompute the nullification status as needed
245 }
246
247 return hash.Sum(nil), nil
248}
249
250func (d *DIDPLCApplication) maybeCreateAuthoritativeImportTx(ctx context.Context) ([]byte, error) {
251 // use WorkingTreeVersion so we take into account any import operation that may have been processed in this block
252 readTx := d.txFactory.ReadWorking(time.Now())
253
254 plcURL, err := store.Tree.AuthoritativePLC(readTx)
255 if err != nil {
256 return nil, stacktrace.Propagate(err, "")
257 }
258
259 if plcURL == "" {
260 // we're not doing imports
261 return nil, nil
262 }
263
264 cursor, err := store.Tree.AuthoritativeImportProgress(readTx)
265 if err != nil {
266 return nil, stacktrace.Propagate(err, "")
267 }
268
269 aoc := getOrCreateAuthoritativeOperationsCache(d.runnerContext, d.aocsByPLC, plcURL)
270
271 entries, err := aoc.get(ctx, cursor, OpsPerImportTx)
272 if err != nil {
273 return nil, stacktrace.Propagate(err, "")
274 }
275
276 if len(entries) == 0 {
277 // nothing to import at the moment
278 return nil, nil
279 }
280
281 hashBytes, err := computeLogEntriesHash(entries)
282 if err != nil {
283 return nil, stacktrace.Propagate(err, "")
284 }
285
286 tx := Transaction[AuthoritativeImportArguments]{
287 Action: TransactionActionAuthoritativeImport,
288 Arguments: AuthoritativeImportArguments{
289 PLCURL: plcURL,
290 Hash: hex.EncodeToString(hashBytes),
291 Cursor: cursor,
292 Count: uint64(len(entries)),
293 },
294 }
295
296 out, err := cbornode.DumpObject(tx)
297 if err != nil {
298 return nil, stacktrace.Propagate(err, "")
299 }
300 return out, nil
301}