1package main
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/binary"
7 "errors"
8 "fmt"
9 "log/slog"
10 "sync"
11 "time"
12
13 "github.com/cockroachdb/pebble"
14)
15
16func makeCollectionInternKey(collection string) []byte {
17 out := make([]byte, len(collection)+1)
18 out[0] = 'C'
19 copy(out[1:], collection)
20 return out
21}
22
23func parseCollectionInternKey(key []byte) string {
24 if key[0] != 'C' {
25 panic(fmt.Sprintf("collection key must start with C, got %v", key[0]))
26 }
27 return string(key[1:])
28}
29
30func makePrimaryPebbleRow(collectionId uint32, did string, seenMs int64) []byte {
31 out := make([]byte, 1+4+8+len(did))
32 out[0] = 'A'
33 binary.BigEndian.PutUint32(out[1:], collectionId)
34 pos := 1 + 4
35 binary.BigEndian.PutUint64(out[pos:], uint64(seenMs))
36 pos += 8
37 copy(out[pos:], did)
38 return out
39}
40
41func parsePrimaryPebbleRow(row []byte) (collectionId uint32, did string, seenMs int64) {
42 if row[0] != 'A' {
43 panic(fmt.Sprintf("primary row key wanted A got %v", row[0]))
44 }
45 collectionId = binary.BigEndian.Uint32(row[1:5])
46 seenMs = int64(binary.BigEndian.Uint64(row[5:13]))
47 did = string(row[13:])
48 return collectionId, did, seenMs
49}
50
51func makeByDidKey(did string, collectionId uint32) []byte {
52 out := make([]byte, 1+len(did)+4)
53 out[0] = 'D'
54 copy(out[1:1+len(did)], did)
55 pos := 1 + len(did)
56 binary.BigEndian.PutUint32(out[pos:], collectionId)
57 return out
58}
59
60func parseByDidKey(key []byte) (did string, collectionId uint32) {
61 if key[0] != 'D' {
62 panic(fmt.Sprintf("by did key wanted D got %v", key[0]))
63 }
64 last4 := len(key) - 5
65 collectionId = binary.BigEndian.Uint32(key[last4:])
66 did = string(key[1 : last4+1])
67 return did, collectionId
68}
69
70// PebbleCollectionDirectory holds a DID<=>{collections} directory in pebble db.
71// The primary database is (collection, seen time int64 milliseconds, did)
72// Inner schema:
73// C{collection} : {uint32 collectionId}
74// D{did}{uint32 collectionId} : {uint64 seen ms}
75// A{uint32 collectionId}{uint64 seen ms}{did} : 't'
76type PebbleCollectionDirectory struct {
77 db *pebble.DB
78
79 // collections can be LRU cache if it ever becomes too big
80 collections map[string]uint32
81 collectionNames map[uint32]string // TODO: B-tree would be nice
82 maxCollectionId uint32
83 collectionsLock sync.Mutex
84
85 log *slog.Logger
86}
87
88func (pcd *PebbleCollectionDirectory) Open(pebblePath string) error {
89 db, err := pebble.Open(pebblePath, &pebble.Options{})
90 if err != nil {
91 return fmt.Errorf("%s: could not open db, %w", pebblePath, err)
92 }
93 pcd.db = db
94 pcd.collections = make(map[string]uint32)
95 pcd.collectionNames = make(map[uint32]string)
96 if pcd.log == nil {
97 pcd.log = slog.Default()
98 }
99 return pcd.readAllCollectionInterns(context.Background())
100}
101
102func (pcd *PebbleCollectionDirectory) Close() error {
103 err := pcd.db.Flush()
104 if err != nil {
105 pcd.log.Error("pebble flush", "err", err)
106 }
107 err = pcd.db.Close()
108 if err != nil {
109 pcd.log.Error("pebble close", "err", err)
110 }
111 return err
112}
113
114// readAllCollectionInterns should only be run at setup time inside Open() when locking against threads is not needed
115func (pcd *PebbleCollectionDirectory) readAllCollectionInterns(ctx context.Context) error {
116 lower := []byte{'C'}
117 upper := []byte{'D'}
118 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{
119 LowerBound: lower,
120 UpperBound: upper,
121 })
122 if err != nil {
123 return fmt.Errorf("collection iter start, %w", err)
124 }
125 defer iter.Close()
126 count := 0
127 for iter.First(); iter.Valid(); iter.Next() {
128 key := iter.Key()
129 value, err := iter.ValueAndErr()
130 if err != nil {
131 return fmt.Errorf("collection iter, %w", err)
132 }
133 collection := parseCollectionInternKey(key)
134 collectionId := binary.BigEndian.Uint32(value)
135 count++
136 pcd.collections[collection] = collectionId
137 pcd.collectionNames[collectionId] = collection
138 if collectionId > pcd.maxCollectionId {
139 pcd.maxCollectionId = collectionId
140 }
141 pcd.log.Debug("collection", "name", collection, "id", collectionId)
142 }
143 pcd.log.Debug("read collections", "count", count, "max", pcd.maxCollectionId)
144 return nil
145}
146
147type CollectionDidTime struct {
148 Collection string
149 Did string
150 UnixMillis int64
151}
152
153func (pcd *PebbleCollectionDirectory) ReadAllPrimary(ctx context.Context, out chan<- CollectionDidTime) error {
154 defer close(out)
155 lower := []byte{'A'}
156 upper := []byte{'B'}
157 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{
158 LowerBound: lower,
159 UpperBound: upper,
160 })
161 if err != nil {
162 return fmt.Errorf("collection iter start, %w", err)
163 }
164 defer iter.Close()
165 count := 0
166 done := ctx.Done()
167 for iter.First(); iter.Valid(); iter.Next() {
168 key := iter.Key()
169 collectionId, did, seenMs := parsePrimaryPebbleRow(key)
170 count++
171 collection := pcd.collectionNames[collectionId]
172 rec := CollectionDidTime{
173 Collection: collection,
174 Did: did,
175 UnixMillis: seenMs,
176 }
177 select {
178 case <-done:
179 return nil
180 case out <- rec:
181 }
182 }
183 pcd.log.Debug("read primary", "count", count)
184 return nil
185}
186
187func (pcd *PebbleCollectionDirectory) ReadCollection(ctx context.Context, collection, cursor string, limit int) (result []CollectionDidTime, nextCursor string, err error) {
188 var lower []byte
189 collectionId, err := pcd.CollectionToId(collection, false)
190 if err != nil {
191 if err == ErrNotFound {
192 return nil, "", nil
193 }
194 return nil, "", fmt.Errorf("collection id err, %w", err)
195 }
196 if cursor != "" {
197 lower, err = base64.StdEncoding.DecodeString(cursor)
198 if err != nil {
199 return nil, "", fmt.Errorf("could not decode cursor, %w", err)
200 }
201 } else {
202 lower = make([]byte, 1+4)
203 lower[0] = 'A'
204 binary.BigEndian.PutUint32(lower[1:], collectionId)
205 }
206 var upper [5]byte
207 upper[0] = 'A'
208 binary.BigEndian.PutUint32(upper[1:], collectionId+1)
209 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{
210 LowerBound: lower,
211 UpperBound: upper[:],
212 })
213 if err != nil {
214 return nil, "", fmt.Errorf("collection iter start, %w", err)
215 }
216 defer iter.Close()
217 count := 0
218 done := ctx.Done()
219 result = make([]CollectionDidTime, 0, limit)
220 for iter.First(); iter.Valid(); iter.Next() {
221 key := iter.Key()
222 collectionId, did, seenMs := parsePrimaryPebbleRow(key)
223 count++
224 collection := pcd.collectionNames[collectionId]
225 rec := CollectionDidTime{
226 Collection: collection,
227 Did: did,
228 UnixMillis: seenMs,
229 }
230 result = append(result, rec)
231 breaker := false
232 if count >= limit {
233 breaker = true
234 } else {
235 select {
236 case <-done:
237 breaker = true
238 default:
239 }
240 }
241 if breaker {
242 prevKey := make([]byte, len(key), len(key)+1)
243 copy(prevKey, key)
244 prevKey = append(prevKey, 0)
245 nextCursor = base64.StdEncoding.EncodeToString(prevKey)
246 break
247 }
248 }
249 pcd.log.Debug("read primary", "count", count)
250 return result, nextCursor, nil
251}
252
253var ErrNotFound = errors.New("not found")
254
255func (pcd *PebbleCollectionDirectory) CollectionToId(collection string, create bool) (uint32, error) {
256 pcd.collectionsLock.Lock()
257 defer pcd.collectionsLock.Unlock()
258 // easy mode: in cache
259 collectionId, ok := pcd.collections[collection]
260 if ok {
261 return collectionId, nil
262 }
263
264 // read from db
265 key := makeCollectionInternKey(collection)
266 value, closer, err := pcd.db.Get(key)
267 if closer != nil {
268 defer closer.Close()
269 }
270 if err == nil {
271 collectionId = binary.BigEndian.Uint32(value)
272 return collectionId, nil
273 }
274
275 if !create {
276 return 0, ErrNotFound
277 }
278 // make new id, write to db
279 if errors.Is(err, pebble.ErrNotFound) {
280 // ok, fall through
281 } else if err != nil {
282 return 0, fmt.Errorf("pebble get err, %w", err)
283 }
284 collectionId = pcd.maxCollectionId + 1
285 pcd.maxCollectionId = collectionId
286 var cib [4]byte
287 binary.BigEndian.PutUint32(cib[:], collectionId)
288 err = pcd.db.Set(key, cib[:], pebble.NoSync)
289 if err != nil {
290 return 0, fmt.Errorf("pebble set err, %w", err)
291 }
292 pcd.collections[collection] = collectionId
293 pcd.collectionNames[collectionId] = collection
294 return collectionId, nil
295}
296
297var trueValue = [1]byte{'t'}
298
299func (pcd *PebbleCollectionDirectory) CountDidCollections(did string) (int, error) {
300 lower := make([]byte, 1+len(did))
301 lower[0] = 'D'
302 copy(lower[1:1+len(did)], did)
303 upper := make([]byte, len(lower))
304 copy(upper, lower)
305 upper[len(upper)-1]++
306 ctx := context.Background()
307 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{
308 LowerBound: lower,
309 UpperBound: upper,
310 })
311 if err != nil {
312 return 0, fmt.Errorf("did iter start, %w", err)
313 }
314 defer iter.Close()
315 count := 0
316 for iter.First(); iter.Valid(); iter.Next() {
317 //key := iter.Key()
318 //xdid, xcollectionId := parseByDidKey(key)
319 count++
320 }
321 return count, nil
322}
323
324func (pcd *PebbleCollectionDirectory) MaybeSetCollection(did, collection string) error {
325 collectionId, err := pcd.CollectionToId(collection, true)
326 if err != nil {
327 return err
328 }
329 dkey := makeByDidKey(did, collectionId)
330 _, closer, err := pcd.db.Get(dkey)
331 if closer != nil {
332 defer closer.Close()
333 }
334 if err == nil {
335 // already exists, done
336 pebbleDup.Inc()
337 return nil
338 }
339 if errors.Is(err, pebble.ErrNotFound) {
340 // ok, fall through
341 } else if err != nil {
342 return fmt.Errorf("pebble get err, %w", err)
343 }
344
345 now := time.Now()
346 pkey := makePrimaryPebbleRow(collectionId, did, now.UnixMilli())
347 err = pcd.db.Set(pkey, trueValue[:], pebble.NoSync)
348 if err != nil {
349 return fmt.Errorf("pebble set err, %w", err)
350 }
351 var timebytes [8]byte
352 binary.BigEndian.PutUint64(timebytes[:], uint64(now.UnixMilli()))
353 err = pcd.db.Set(dkey, timebytes[:], pebble.NoSync)
354 if err != nil {
355 return fmt.Errorf("pebble set err, %w", err)
356 }
357 pebbleNew.Inc()
358 return nil
359}
360
361func (pcd *PebbleCollectionDirectory) SetFromResults(results <-chan DidCollection) {
362 errcount := 0
363 for result := range results {
364 err := pcd.MaybeSetCollection(result.Did, result.Collection)
365 if err != nil {
366 errcount++
367 pcd.log.Error("set collection", "err", err)
368 if errcount > 0 {
369 // TODO: signal backpressure and shutdown
370 return
371 }
372 } else {
373 errcount = 0
374 }
375 }
376}
377
378type CollectionStats struct {
379 CollectionCounts map[string]uint64 `json:"collections"`
380}
381
382func (pcd *PebbleCollectionDirectory) GetCollectionStats() (stats CollectionStats, err error) {
383 ctx := context.Background()
384 records := make(chan CollectionDidTime, 1000)
385 go pcd.ReadAllPrimary(ctx, records)
386
387 stats.CollectionCounts = make(map[string]uint64)
388
389 for rec := range records {
390 stats.CollectionCounts[rec.Collection]++
391 }
392
393 return stats, nil
394}
395
396const seqKey = "Xseq"
397
398func (pcd *PebbleCollectionDirectory) SetSequence(seq int64) error {
399 var seqb [8]byte
400 binary.BigEndian.PutUint64(seqb[:], uint64(seq))
401 return pcd.db.Set([]byte(seqKey), seqb[:], pebble.NoSync)
402}
403func (pcd *PebbleCollectionDirectory) GetSequence() (int64, bool, error) {
404 vbytes, closer, err := pcd.db.Get([]byte(seqKey))
405 if closer != nil {
406 defer closer.Close()
407 }
408 if errors.Is(err, pebble.ErrNotFound) {
409 return 0, false, nil
410 }
411 if err != nil {
412 return 0, false, fmt.Errorf("pebble seq err, %w", err)
413 }
414 seq := int64(binary.BigEndian.Uint64(vbytes))
415 return seq, true, nil
416}