fork of indigo with slightly nicer lexgen
at main 11 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "log/slog" 10 "sync" 11 "time" 12 13 "github.com/cockroachdb/pebble" 14) 15 16func makeCollectionInternKey(collection string) []byte { 17 out := make([]byte, len(collection)+1) 18 out[0] = 'C' 19 copy(out[1:], collection) 20 return out 21} 22 23func parseCollectionInternKey(key []byte) string { 24 if key[0] != 'C' { 25 panic(fmt.Sprintf("collection key must start with C, got %v", key[0])) 26 } 27 return string(key[1:]) 28} 29 30func makePrimaryPebbleRow(collectionId uint32, did string, seenMs int64) []byte { 31 out := make([]byte, 1+4+8+len(did)) 32 out[0] = 'A' 33 binary.BigEndian.PutUint32(out[1:], collectionId) 34 pos := 1 + 4 35 binary.BigEndian.PutUint64(out[pos:], uint64(seenMs)) 36 pos += 8 37 copy(out[pos:], did) 38 return out 39} 40 41func parsePrimaryPebbleRow(row []byte) (collectionId uint32, did string, seenMs int64) { 42 if row[0] != 'A' { 43 panic(fmt.Sprintf("primary row key wanted A got %v", row[0])) 44 } 45 collectionId = binary.BigEndian.Uint32(row[1:5]) 46 seenMs = int64(binary.BigEndian.Uint64(row[5:13])) 47 did = string(row[13:]) 48 return collectionId, did, seenMs 49} 50 51func makeByDidKey(did string, collectionId uint32) []byte { 52 out := make([]byte, 1+len(did)+4) 53 out[0] = 'D' 54 copy(out[1:1+len(did)], did) 55 pos := 1 + len(did) 56 binary.BigEndian.PutUint32(out[pos:], collectionId) 57 return out 58} 59 60func parseByDidKey(key []byte) (did string, collectionId uint32) { 61 if key[0] != 'D' { 62 panic(fmt.Sprintf("by did key wanted D got %v", key[0])) 63 } 64 last4 := len(key) - 5 65 collectionId = binary.BigEndian.Uint32(key[last4:]) 66 did = string(key[1 : last4+1]) 67 return did, collectionId 68} 69 70// PebbleCollectionDirectory holds a DID<=>{collections} directory in pebble db. 71// The primary database is (collection, seen time int64 milliseconds, did) 72// Inner schema: 73// C{collection} : {uint32 collectionId} 74// D{did}{uint32 collectionId} : {uint64 seen ms} 75// A{uint32 collectionId}{uint64 seen ms}{did} : 't' 76type PebbleCollectionDirectory struct { 77 db *pebble.DB 78 79 // collections can be LRU cache if it ever becomes too big 80 collections map[string]uint32 81 collectionNames map[uint32]string // TODO: B-tree would be nice 82 maxCollectionId uint32 83 collectionsLock sync.Mutex 84 85 log *slog.Logger 86} 87 88func (pcd *PebbleCollectionDirectory) Open(pebblePath string) error { 89 db, err := pebble.Open(pebblePath, &pebble.Options{}) 90 if err != nil { 91 return fmt.Errorf("%s: could not open db, %w", pebblePath, err) 92 } 93 pcd.db = db 94 pcd.collections = make(map[string]uint32) 95 pcd.collectionNames = make(map[uint32]string) 96 if pcd.log == nil { 97 pcd.log = slog.Default() 98 } 99 return pcd.readAllCollectionInterns(context.Background()) 100} 101 102func (pcd *PebbleCollectionDirectory) Close() error { 103 err := pcd.db.Flush() 104 if err != nil { 105 pcd.log.Error("pebble flush", "err", err) 106 } 107 err = pcd.db.Close() 108 if err != nil { 109 pcd.log.Error("pebble close", "err", err) 110 } 111 return err 112} 113 114// readAllCollectionInterns should only be run at setup time inside Open() when locking against threads is not needed 115func (pcd *PebbleCollectionDirectory) readAllCollectionInterns(ctx context.Context) error { 116 lower := []byte{'C'} 117 upper := []byte{'D'} 118 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 119 LowerBound: lower, 120 UpperBound: upper, 121 }) 122 if err != nil { 123 return fmt.Errorf("collection iter start, %w", err) 124 } 125 defer iter.Close() 126 count := 0 127 for iter.First(); iter.Valid(); iter.Next() { 128 key := iter.Key() 129 value, err := iter.ValueAndErr() 130 if err != nil { 131 return fmt.Errorf("collection iter, %w", err) 132 } 133 collection := parseCollectionInternKey(key) 134 collectionId := binary.BigEndian.Uint32(value) 135 count++ 136 pcd.collections[collection] = collectionId 137 pcd.collectionNames[collectionId] = collection 138 if collectionId > pcd.maxCollectionId { 139 pcd.maxCollectionId = collectionId 140 } 141 pcd.log.Debug("collection", "name", collection, "id", collectionId) 142 } 143 pcd.log.Debug("read collections", "count", count, "max", pcd.maxCollectionId) 144 return nil 145} 146 147type CollectionDidTime struct { 148 Collection string 149 Did string 150 UnixMillis int64 151} 152 153func (pcd *PebbleCollectionDirectory) ReadAllPrimary(ctx context.Context, out chan<- CollectionDidTime) error { 154 defer close(out) 155 lower := []byte{'A'} 156 upper := []byte{'B'} 157 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 158 LowerBound: lower, 159 UpperBound: upper, 160 }) 161 if err != nil { 162 return fmt.Errorf("collection iter start, %w", err) 163 } 164 defer iter.Close() 165 count := 0 166 done := ctx.Done() 167 for iter.First(); iter.Valid(); iter.Next() { 168 key := iter.Key() 169 collectionId, did, seenMs := parsePrimaryPebbleRow(key) 170 count++ 171 collection := pcd.collectionNames[collectionId] 172 rec := CollectionDidTime{ 173 Collection: collection, 174 Did: did, 175 UnixMillis: seenMs, 176 } 177 select { 178 case <-done: 179 return nil 180 case out <- rec: 181 } 182 } 183 pcd.log.Debug("read primary", "count", count) 184 return nil 185} 186 187func (pcd *PebbleCollectionDirectory) ReadCollection(ctx context.Context, collection, cursor string, limit int) (result []CollectionDidTime, nextCursor string, err error) { 188 var lower []byte 189 collectionId, err := pcd.CollectionToId(collection, false) 190 if err != nil { 191 if err == ErrNotFound { 192 return nil, "", nil 193 } 194 return nil, "", fmt.Errorf("collection id err, %w", err) 195 } 196 if cursor != "" { 197 lower, err = base64.StdEncoding.DecodeString(cursor) 198 if err != nil { 199 return nil, "", fmt.Errorf("could not decode cursor, %w", err) 200 } 201 } else { 202 lower = make([]byte, 1+4) 203 lower[0] = 'A' 204 binary.BigEndian.PutUint32(lower[1:], collectionId) 205 } 206 var upper [5]byte 207 upper[0] = 'A' 208 binary.BigEndian.PutUint32(upper[1:], collectionId+1) 209 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 210 LowerBound: lower, 211 UpperBound: upper[:], 212 }) 213 if err != nil { 214 return nil, "", fmt.Errorf("collection iter start, %w", err) 215 } 216 defer iter.Close() 217 count := 0 218 done := ctx.Done() 219 result = make([]CollectionDidTime, 0, limit) 220 for iter.First(); iter.Valid(); iter.Next() { 221 key := iter.Key() 222 collectionId, did, seenMs := parsePrimaryPebbleRow(key) 223 count++ 224 collection := pcd.collectionNames[collectionId] 225 rec := CollectionDidTime{ 226 Collection: collection, 227 Did: did, 228 UnixMillis: seenMs, 229 } 230 result = append(result, rec) 231 breaker := false 232 if count >= limit { 233 breaker = true 234 } else { 235 select { 236 case <-done: 237 breaker = true 238 default: 239 } 240 } 241 if breaker { 242 prevKey := make([]byte, len(key), len(key)+1) 243 copy(prevKey, key) 244 prevKey = append(prevKey, 0) 245 nextCursor = base64.StdEncoding.EncodeToString(prevKey) 246 break 247 } 248 } 249 pcd.log.Debug("read primary", "count", count) 250 return result, nextCursor, nil 251} 252 253var ErrNotFound = errors.New("not found") 254 255func (pcd *PebbleCollectionDirectory) CollectionToId(collection string, create bool) (uint32, error) { 256 pcd.collectionsLock.Lock() 257 defer pcd.collectionsLock.Unlock() 258 // easy mode: in cache 259 collectionId, ok := pcd.collections[collection] 260 if ok { 261 return collectionId, nil 262 } 263 264 // read from db 265 key := makeCollectionInternKey(collection) 266 value, closer, err := pcd.db.Get(key) 267 if closer != nil { 268 defer closer.Close() 269 } 270 if err == nil { 271 collectionId = binary.BigEndian.Uint32(value) 272 return collectionId, nil 273 } 274 275 if !create { 276 return 0, ErrNotFound 277 } 278 // make new id, write to db 279 if errors.Is(err, pebble.ErrNotFound) { 280 // ok, fall through 281 } else if err != nil { 282 return 0, fmt.Errorf("pebble get err, %w", err) 283 } 284 collectionId = pcd.maxCollectionId + 1 285 pcd.maxCollectionId = collectionId 286 var cib [4]byte 287 binary.BigEndian.PutUint32(cib[:], collectionId) 288 err = pcd.db.Set(key, cib[:], pebble.NoSync) 289 if err != nil { 290 return 0, fmt.Errorf("pebble set err, %w", err) 291 } 292 pcd.collections[collection] = collectionId 293 pcd.collectionNames[collectionId] = collection 294 return collectionId, nil 295} 296 297var trueValue = [1]byte{'t'} 298 299func (pcd *PebbleCollectionDirectory) CountDidCollections(did string) (int, error) { 300 lower := make([]byte, 1+len(did)) 301 lower[0] = 'D' 302 copy(lower[1:1+len(did)], did) 303 upper := make([]byte, len(lower)) 304 copy(upper, lower) 305 upper[len(upper)-1]++ 306 ctx := context.Background() 307 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 308 LowerBound: lower, 309 UpperBound: upper, 310 }) 311 if err != nil { 312 return 0, fmt.Errorf("did iter start, %w", err) 313 } 314 defer iter.Close() 315 count := 0 316 for iter.First(); iter.Valid(); iter.Next() { 317 //key := iter.Key() 318 //xdid, xcollectionId := parseByDidKey(key) 319 count++ 320 } 321 return count, nil 322} 323 324func (pcd *PebbleCollectionDirectory) MaybeSetCollection(did, collection string) error { 325 collectionId, err := pcd.CollectionToId(collection, true) 326 if err != nil { 327 return err 328 } 329 dkey := makeByDidKey(did, collectionId) 330 _, closer, err := pcd.db.Get(dkey) 331 if closer != nil { 332 defer closer.Close() 333 } 334 if err == nil { 335 // already exists, done 336 pebbleDup.Inc() 337 return nil 338 } 339 if errors.Is(err, pebble.ErrNotFound) { 340 // ok, fall through 341 } else if err != nil { 342 return fmt.Errorf("pebble get err, %w", err) 343 } 344 345 now := time.Now() 346 pkey := makePrimaryPebbleRow(collectionId, did, now.UnixMilli()) 347 err = pcd.db.Set(pkey, trueValue[:], pebble.NoSync) 348 if err != nil { 349 return fmt.Errorf("pebble set err, %w", err) 350 } 351 var timebytes [8]byte 352 binary.BigEndian.PutUint64(timebytes[:], uint64(now.UnixMilli())) 353 err = pcd.db.Set(dkey, timebytes[:], pebble.NoSync) 354 if err != nil { 355 return fmt.Errorf("pebble set err, %w", err) 356 } 357 pebbleNew.Inc() 358 return nil 359} 360 361func (pcd *PebbleCollectionDirectory) SetFromResults(results <-chan DidCollection) { 362 errcount := 0 363 for result := range results { 364 err := pcd.MaybeSetCollection(result.Did, result.Collection) 365 if err != nil { 366 errcount++ 367 pcd.log.Error("set collection", "err", err) 368 if errcount > 0 { 369 // TODO: signal backpressure and shutdown 370 return 371 } 372 } else { 373 errcount = 0 374 } 375 } 376} 377 378type CollectionStats struct { 379 CollectionCounts map[string]uint64 `json:"collections"` 380} 381 382func (pcd *PebbleCollectionDirectory) GetCollectionStats() (stats CollectionStats, err error) { 383 ctx := context.Background() 384 records := make(chan CollectionDidTime, 1000) 385 go pcd.ReadAllPrimary(ctx, records) 386 387 stats.CollectionCounts = make(map[string]uint64) 388 389 for rec := range records { 390 stats.CollectionCounts[rec.Collection]++ 391 } 392 393 return stats, nil 394} 395 396const seqKey = "Xseq" 397 398func (pcd *PebbleCollectionDirectory) SetSequence(seq int64) error { 399 var seqb [8]byte 400 binary.BigEndian.PutUint64(seqb[:], uint64(seq)) 401 return pcd.db.Set([]byte(seqKey), seqb[:], pebble.NoSync) 402} 403func (pcd *PebbleCollectionDirectory) GetSequence() (int64, bool, error) { 404 vbytes, closer, err := pcd.db.Get([]byte(seqKey)) 405 if closer != nil { 406 defer closer.Close() 407 } 408 if errors.Is(err, pebble.ErrNotFound) { 409 return 0, false, nil 410 } 411 if err != nil { 412 return 0, false, fmt.Errorf("pebble seq err, %w", err) 413 } 414 seq := int64(binary.BigEndian.Uint64(vbytes)) 415 return seq, true, nil 416}