A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 418 lines 11 kB view raw
1package bundle 2 3import ( 4 "context" 5 "encoding/binary" 6 "fmt" 7 "os" 8 "path/filepath" 9 "sort" 10 "sync" 11) 12 13const ( 14 BUILD_BATCH_SIZE = 100 // Process 100 bundles at a time (not used in streaming approach) 15) 16 17// ShardBuilder accumulates DID positions for a shard 18type ShardBuilder struct { 19 entries map[string][]OpLocation 20 mu sync.Mutex 21} 22 23// newShardBuilder creates a new shard builder 24func newShardBuilder() *ShardBuilder { 25 return &ShardBuilder{ 26 entries: make(map[string][]OpLocation), 27 } 28} 29 30// add adds a location to the shard 31func (sb *ShardBuilder) add(identifier string, bundle uint16, position uint16, nullified bool) { 32 sb.mu.Lock() 33 defer sb.mu.Unlock() 34 35 sb.entries[identifier] = append(sb.entries[identifier], OpLocation{ 36 Bundle: bundle, 37 Position: position, 38 Nullified: nullified, 39 }) 40} 41 42// BuildIndexFromScratch builds index with controlled memory usage 43func (dim *DIDIndexManager) BuildIndexFromScratch(ctx context.Context, mgr *Manager, progressCallback func(current, total int)) error { 44 dim.logger.Printf("Building DID index from scratch (memory-efficient mode)...") 45 46 bundles := mgr.index.GetBundles() 47 if len(bundles) == 0 { 48 return fmt.Errorf("no bundles to index") 49 } 50 51 // Create temporary shard files 52 tempShards := make([]*os.File, DID_SHARD_COUNT) 53 for i := 0; i < DID_SHARD_COUNT; i++ { 54 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", i)) 55 f, err := os.Create(tempPath) 56 if err != nil { 57 for j := 0; j < i; j++ { 58 tempShards[j].Close() 59 os.Remove(filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", j))) 60 } 61 return fmt.Errorf("failed to create temp shard: %w", err) 62 } 63 tempShards[i] = f 64 } 65 66 dim.logger.Printf("Pass 1/2: Scanning %d bundles...", len(bundles)) 67 68 // Stream all operations to temp files 69 for i, meta := range bundles { 70 select { 71 case <-ctx.Done(): 72 for _, f := range tempShards { 73 f.Close() 74 os.Remove(f.Name()) 75 } 76 return ctx.Err() 77 default: 78 } 79 80 if progressCallback != nil { 81 progressCallback(i+1, len(bundles)) 82 } 83 84 // Load bundle 85 bundle, err := mgr.LoadBundle(ctx, meta.BundleNumber) 86 if err != nil { 87 dim.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err) 88 continue 89 } 90 91 // Process each operation 92 for pos, op := range bundle.Operations { 93 identifier, err := extractDIDIdentifier(op.DID) 94 if err != nil { 95 continue 96 } 97 98 shardNum := dim.calculateShard(identifier) 99 100 // Write entry: [24 bytes ID][2 bytes bundle][2 bytes pos][1 byte nullified] 101 entry := make([]byte, 29) 102 copy(entry[0:24], identifier) 103 binary.LittleEndian.PutUint16(entry[24:26], uint16(meta.BundleNumber)) 104 binary.LittleEndian.PutUint16(entry[26:28], uint16(pos)) 105 106 // Store nullified flag 107 if op.IsNullified() { 108 entry[28] = 1 109 } else { 110 entry[28] = 0 111 } 112 113 if _, err := tempShards[shardNum].Write(entry); err != nil { 114 dim.logger.Printf("Warning: failed to write to temp shard %02x: %v", shardNum, err) 115 } 116 } 117 } 118 119 // Close temp files 120 for _, f := range tempShards { 121 f.Close() 122 } 123 124 dim.logger.Printf("\n") 125 dim.logger.Printf("Pass 2/2: Consolidating %d shards...", DID_SHARD_COUNT) 126 127 // Consolidate shards 128 totalDIDs := int64(0) 129 for i := 0; i < DID_SHARD_COUNT; i++ { 130 // Log every 32 shards 131 if i%32 == 0 || i == DID_SHARD_COUNT-1 { 132 dim.logger.Printf(" Consolidating shards: %d/%d (%.1f%%)", 133 i+1, DID_SHARD_COUNT, float64(i+1)/float64(DID_SHARD_COUNT)*100) 134 } 135 136 count, err := dim.consolidateShard(uint8(i)) 137 if err != nil { 138 return fmt.Errorf("failed to consolidate shard %02x: %w", i, err) 139 } 140 totalDIDs += count 141 } 142 143 dim.config.TotalDIDs = totalDIDs 144 dim.config.LastBundle = bundles[len(bundles)-1].BundleNumber 145 146 if err := dim.saveIndexConfig(); err != nil { 147 return fmt.Errorf("failed to save config: %w", err) 148 } 149 150 dim.logger.Printf("✓ Index built: %d DIDs indexed", totalDIDs) 151 152 return nil 153} 154 155// consolidateShard reads temp file, sorts, and writes final shard 156func (dim *DIDIndexManager) consolidateShard(shardNum uint8) (int64, error) { 157 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", shardNum)) 158 159 // Read all entries from temp file 160 data, err := os.ReadFile(tempPath) 161 if err != nil { 162 if os.IsNotExist(err) { 163 return 0, nil // No entries for this shard 164 } 165 return 0, err 166 } 167 168 if len(data) == 0 { 169 os.Remove(tempPath) 170 return 0, nil 171 } 172 173 // Parse entries (29 bytes each) 174 entryCount := len(data) / 29 175 if len(data)%29 != 0 { 176 return 0, fmt.Errorf("corrupted temp shard: size not multiple of 29") 177 } 178 179 type tempEntry struct { 180 identifier string 181 bundle uint16 182 position uint16 183 nullified bool 184 } 185 186 entries := make([]tempEntry, entryCount) 187 for i := 0; i < entryCount; i++ { 188 offset := i * 29 189 entries[i] = tempEntry{ 190 identifier: string(data[offset : offset+24]), 191 bundle: binary.LittleEndian.Uint16(data[offset+24 : offset+26]), 192 position: binary.LittleEndian.Uint16(data[offset+26 : offset+28]), 193 nullified: data[offset+28] != 0, 194 } 195 } 196 197 // Free the data slice 198 data = nil 199 200 // Sort by identifier 201 sort.Slice(entries, func(i, j int) bool { 202 return entries[i].identifier < entries[j].identifier 203 }) 204 205 // Group by DID 206 builder := newShardBuilder() 207 for _, entry := range entries { 208 builder.add(entry.identifier, entry.bundle, entry.position, entry.nullified) 209 } 210 211 // Free entries 212 entries = nil 213 214 // Write final shard 215 if err := dim.writeShard(shardNum, builder); err != nil { 216 return 0, err 217 } 218 219 // Clean up temp file 220 os.Remove(tempPath) 221 222 return int64(len(builder.entries)), nil 223} 224 225// UpdateIndexForBundle adds operations from a new bundle (incremental) 226func (dim *DIDIndexManager) UpdateIndexForBundle(ctx context.Context, bundle *Bundle) error { 227 // Group operations by shard 228 shardOps := make(map[uint8]map[string][]OpLocation) 229 230 for pos, op := range bundle.Operations { 231 identifier, err := extractDIDIdentifier(op.DID) 232 if err != nil { 233 continue 234 } 235 236 shardNum := dim.calculateShard(identifier) 237 238 if shardOps[shardNum] == nil { 239 shardOps[shardNum] = make(map[string][]OpLocation) 240 } 241 242 shardOps[shardNum][identifier] = append(shardOps[shardNum][identifier], OpLocation{ 243 Bundle: uint16(bundle.BundleNumber), 244 Position: uint16(pos), 245 Nullified: op.IsNullified(), 246 }) 247 } 248 249 // Update affected shards (one at a time to save memory) 250 for shardNum, newOps := range shardOps { 251 if err := dim.updateShard(shardNum, newOps); err != nil { 252 return fmt.Errorf("failed to update shard %02x: %w", shardNum, err) 253 } 254 } 255 256 // Update config 257 dim.config.LastBundle = bundle.BundleNumber 258 return dim.saveIndexConfig() 259} 260 261// updateShard updates a single shard with new operations 262func (dim *DIDIndexManager) updateShard(shardNum uint8, newOps map[string][]OpLocation) error { 263 // Load existing shard data 264 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 265 266 existingBuilder := newShardBuilder() 267 268 // Read existing shard if it exists 269 if data, err := os.ReadFile(shardPath); err == nil && len(data) > 32 { 270 if err := dim.parseShardData(data, existingBuilder); err != nil { 271 return err 272 } 273 } 274 275 // Merge new operations 276 for identifier, locations := range newOps { 277 existingBuilder.entries[identifier] = append(existingBuilder.entries[identifier], locations...) 278 } 279 280 // Write updated shard 281 return dim.writeShard(shardNum, existingBuilder) 282} 283 284// parseShardData parses binary shard data into builder 285func (dim *DIDIndexManager) parseShardData(data []byte, builder *ShardBuilder) error { 286 if len(data) < 32 { 287 return nil 288 } 289 290 entryCount := binary.LittleEndian.Uint32(data[9:13]) // Fixed offset 291 offset := 32 292 293 for i := 0; i < int(entryCount); i++ { 294 if offset+DID_IDENTIFIER_LEN+2 > len(data) { 295 break 296 } 297 298 // Read identifier 299 identifier := string(data[offset : offset+DID_IDENTIFIER_LEN]) 300 offset += DID_IDENTIFIER_LEN 301 302 // Read location count 303 locCount := binary.LittleEndian.Uint16(data[offset : offset+2]) 304 offset += 2 305 306 // Read locations 307 locations := make([]OpLocation, locCount) 308 for j := 0; j < int(locCount); j++ { 309 if offset+5 > len(data) { 310 break 311 } 312 313 locations[j] = OpLocation{ 314 Bundle: binary.LittleEndian.Uint16(data[offset : offset+2]), 315 Position: binary.LittleEndian.Uint16(data[offset+2 : offset+4]), 316 Nullified: data[offset+4] != 0, 317 } 318 offset += 5 319 } 320 321 builder.entries[identifier] = locations 322 } 323 324 return nil 325} 326 327// writeShard writes a shard to disk in binary format 328func (dim *DIDIndexManager) writeShard(shardNum uint8, builder *ShardBuilder) error { 329 // Sort identifiers for binary search 330 identifiers := make([]string, 0, len(builder.entries)) 331 for id := range builder.entries { 332 identifiers = append(identifiers, id) 333 } 334 sort.Strings(identifiers) 335 336 // Calculate size 337 totalSize := 32 // Header 338 for _, id := range identifiers { 339 locations := builder.entries[id] 340 totalSize += DID_IDENTIFIER_LEN + 2 + (len(locations) * 5) 341 } 342 343 // Allocate buffer 344 buf := make([]byte, totalSize) 345 346 // Write header 347 copy(buf[0:4], DIDINDEX_MAGIC) 348 binary.LittleEndian.PutUint32(buf[4:8], DIDINDEX_VERSION) 349 buf[8] = shardNum 350 binary.LittleEndian.PutUint32(buf[9:13], uint32(len(identifiers))) 351 // Reserved bytes 13-32 stay zero 352 353 // Write entries 354 offset := 32 355 for _, identifier := range identifiers { 356 locations := builder.entries[identifier] 357 358 // Write identifier (24 bytes) 359 copy(buf[offset:offset+DID_IDENTIFIER_LEN], identifier) 360 offset += DID_IDENTIFIER_LEN 361 362 // Write location count 363 binary.LittleEndian.PutUint16(buf[offset:offset+2], uint16(len(locations))) 364 offset += 2 365 366 // Write locations 367 for _, loc := range locations { 368 binary.LittleEndian.PutUint16(buf[offset:offset+2], loc.Bundle) 369 binary.LittleEndian.PutUint16(buf[offset+2:offset+4], loc.Position) 370 371 // Write nullified flag 372 if loc.Nullified { 373 buf[offset+4] = 1 374 } else { 375 buf[offset+4] = 0 376 } 377 378 offset += 5 379 } 380 } 381 382 // Write atomically (tmp → rename) 383 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 384 tempPath := shardPath + ".tmp" 385 386 if err := os.WriteFile(tempPath, buf, 0644); err != nil { 387 return err 388 } 389 390 if err := os.Rename(tempPath, shardPath); err != nil { 391 os.Remove(tempPath) 392 return err 393 } 394 395 // Invalidate cache for this shard 396 dim.invalidateShard(shardNum) 397 398 return nil 399} 400 401// invalidateShard removes a shard from cache 402func (dim *DIDIndexManager) invalidateShard(shardNum uint8) { 403 dim.cacheMu.Lock() 404 defer dim.cacheMu.Unlock() 405 406 if cached, exists := dim.shardCache[shardNum]; exists { 407 dim.unmapShard(cached) 408 delete(dim.shardCache, shardNum) 409 410 // Remove from LRU order 411 for i, s := range dim.cacheOrder { 412 if s == shardNum { 413 dim.cacheOrder = append(dim.cacheOrder[:i], dim.cacheOrder[i+1:]...) 414 break 415 } 416 } 417 } 418}