A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "context"
5 "encoding/binary"
6 "fmt"
7 "os"
8 "path/filepath"
9 "sort"
10 "sync"
11)
12
13const (
14 BUILD_BATCH_SIZE = 100 // Process 100 bundles at a time (not used in streaming approach)
15)
16
17// ShardBuilder accumulates DID positions for a shard
18type ShardBuilder struct {
19 entries map[string][]OpLocation
20 mu sync.Mutex
21}
22
23// newShardBuilder creates a new shard builder
24func newShardBuilder() *ShardBuilder {
25 return &ShardBuilder{
26 entries: make(map[string][]OpLocation),
27 }
28}
29
30// add adds a location to the shard
31func (sb *ShardBuilder) add(identifier string, bundle uint16, position uint16, nullified bool) {
32 sb.mu.Lock()
33 defer sb.mu.Unlock()
34
35 sb.entries[identifier] = append(sb.entries[identifier], OpLocation{
36 Bundle: bundle,
37 Position: position,
38 Nullified: nullified,
39 })
40}
41
42// BuildIndexFromScratch builds index with controlled memory usage
43func (dim *DIDIndexManager) BuildIndexFromScratch(ctx context.Context, mgr *Manager, progressCallback func(current, total int)) error {
44 dim.logger.Printf("Building DID index from scratch (memory-efficient mode)...")
45
46 bundles := mgr.index.GetBundles()
47 if len(bundles) == 0 {
48 return fmt.Errorf("no bundles to index")
49 }
50
51 // Create temporary shard files
52 tempShards := make([]*os.File, DID_SHARD_COUNT)
53 for i := 0; i < DID_SHARD_COUNT; i++ {
54 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", i))
55 f, err := os.Create(tempPath)
56 if err != nil {
57 for j := 0; j < i; j++ {
58 tempShards[j].Close()
59 os.Remove(filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", j)))
60 }
61 return fmt.Errorf("failed to create temp shard: %w", err)
62 }
63 tempShards[i] = f
64 }
65
66 dim.logger.Printf("Pass 1/2: Scanning %d bundles...", len(bundles))
67
68 // Stream all operations to temp files
69 for i, meta := range bundles {
70 select {
71 case <-ctx.Done():
72 for _, f := range tempShards {
73 f.Close()
74 os.Remove(f.Name())
75 }
76 return ctx.Err()
77 default:
78 }
79
80 if progressCallback != nil {
81 progressCallback(i+1, len(bundles))
82 }
83
84 // Load bundle
85 bundle, err := mgr.LoadBundle(ctx, meta.BundleNumber)
86 if err != nil {
87 dim.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
88 continue
89 }
90
91 // Process each operation
92 for pos, op := range bundle.Operations {
93 identifier, err := extractDIDIdentifier(op.DID)
94 if err != nil {
95 continue
96 }
97
98 shardNum := dim.calculateShard(identifier)
99
100 // Write entry: [24 bytes ID][2 bytes bundle][2 bytes pos][1 byte nullified]
101 entry := make([]byte, 29)
102 copy(entry[0:24], identifier)
103 binary.LittleEndian.PutUint16(entry[24:26], uint16(meta.BundleNumber))
104 binary.LittleEndian.PutUint16(entry[26:28], uint16(pos))
105
106 // Store nullified flag
107 if op.IsNullified() {
108 entry[28] = 1
109 } else {
110 entry[28] = 0
111 }
112
113 if _, err := tempShards[shardNum].Write(entry); err != nil {
114 dim.logger.Printf("Warning: failed to write to temp shard %02x: %v", shardNum, err)
115 }
116 }
117 }
118
119 // Close temp files
120 for _, f := range tempShards {
121 f.Close()
122 }
123
124 dim.logger.Printf("\n")
125 dim.logger.Printf("Pass 2/2: Consolidating %d shards...", DID_SHARD_COUNT)
126
127 // Consolidate shards
128 totalDIDs := int64(0)
129 for i := 0; i < DID_SHARD_COUNT; i++ {
130 // Log every 32 shards
131 if i%32 == 0 || i == DID_SHARD_COUNT-1 {
132 dim.logger.Printf(" Consolidating shards: %d/%d (%.1f%%)",
133 i+1, DID_SHARD_COUNT, float64(i+1)/float64(DID_SHARD_COUNT)*100)
134 }
135
136 count, err := dim.consolidateShard(uint8(i))
137 if err != nil {
138 return fmt.Errorf("failed to consolidate shard %02x: %w", i, err)
139 }
140 totalDIDs += count
141 }
142
143 dim.config.TotalDIDs = totalDIDs
144 dim.config.LastBundle = bundles[len(bundles)-1].BundleNumber
145
146 if err := dim.saveIndexConfig(); err != nil {
147 return fmt.Errorf("failed to save config: %w", err)
148 }
149
150 dim.logger.Printf("✓ Index built: %d DIDs indexed", totalDIDs)
151
152 return nil
153}
154
155// consolidateShard reads temp file, sorts, and writes final shard
156func (dim *DIDIndexManager) consolidateShard(shardNum uint8) (int64, error) {
157 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", shardNum))
158
159 // Read all entries from temp file
160 data, err := os.ReadFile(tempPath)
161 if err != nil {
162 if os.IsNotExist(err) {
163 return 0, nil // No entries for this shard
164 }
165 return 0, err
166 }
167
168 if len(data) == 0 {
169 os.Remove(tempPath)
170 return 0, nil
171 }
172
173 // Parse entries (29 bytes each)
174 entryCount := len(data) / 29
175 if len(data)%29 != 0 {
176 return 0, fmt.Errorf("corrupted temp shard: size not multiple of 29")
177 }
178
179 type tempEntry struct {
180 identifier string
181 bundle uint16
182 position uint16
183 nullified bool
184 }
185
186 entries := make([]tempEntry, entryCount)
187 for i := 0; i < entryCount; i++ {
188 offset := i * 29
189 entries[i] = tempEntry{
190 identifier: string(data[offset : offset+24]),
191 bundle: binary.LittleEndian.Uint16(data[offset+24 : offset+26]),
192 position: binary.LittleEndian.Uint16(data[offset+26 : offset+28]),
193 nullified: data[offset+28] != 0,
194 }
195 }
196
197 // Free the data slice
198 data = nil
199
200 // Sort by identifier
201 sort.Slice(entries, func(i, j int) bool {
202 return entries[i].identifier < entries[j].identifier
203 })
204
205 // Group by DID
206 builder := newShardBuilder()
207 for _, entry := range entries {
208 builder.add(entry.identifier, entry.bundle, entry.position, entry.nullified)
209 }
210
211 // Free entries
212 entries = nil
213
214 // Write final shard
215 if err := dim.writeShard(shardNum, builder); err != nil {
216 return 0, err
217 }
218
219 // Clean up temp file
220 os.Remove(tempPath)
221
222 return int64(len(builder.entries)), nil
223}
224
225// UpdateIndexForBundle adds operations from a new bundle (incremental)
226func (dim *DIDIndexManager) UpdateIndexForBundle(ctx context.Context, bundle *Bundle) error {
227 // Group operations by shard
228 shardOps := make(map[uint8]map[string][]OpLocation)
229
230 for pos, op := range bundle.Operations {
231 identifier, err := extractDIDIdentifier(op.DID)
232 if err != nil {
233 continue
234 }
235
236 shardNum := dim.calculateShard(identifier)
237
238 if shardOps[shardNum] == nil {
239 shardOps[shardNum] = make(map[string][]OpLocation)
240 }
241
242 shardOps[shardNum][identifier] = append(shardOps[shardNum][identifier], OpLocation{
243 Bundle: uint16(bundle.BundleNumber),
244 Position: uint16(pos),
245 Nullified: op.IsNullified(),
246 })
247 }
248
249 // Update affected shards (one at a time to save memory)
250 for shardNum, newOps := range shardOps {
251 if err := dim.updateShard(shardNum, newOps); err != nil {
252 return fmt.Errorf("failed to update shard %02x: %w", shardNum, err)
253 }
254 }
255
256 // Update config
257 dim.config.LastBundle = bundle.BundleNumber
258 return dim.saveIndexConfig()
259}
260
261// updateShard updates a single shard with new operations
262func (dim *DIDIndexManager) updateShard(shardNum uint8, newOps map[string][]OpLocation) error {
263 // Load existing shard data
264 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
265
266 existingBuilder := newShardBuilder()
267
268 // Read existing shard if it exists
269 if data, err := os.ReadFile(shardPath); err == nil && len(data) > 32 {
270 if err := dim.parseShardData(data, existingBuilder); err != nil {
271 return err
272 }
273 }
274
275 // Merge new operations
276 for identifier, locations := range newOps {
277 existingBuilder.entries[identifier] = append(existingBuilder.entries[identifier], locations...)
278 }
279
280 // Write updated shard
281 return dim.writeShard(shardNum, existingBuilder)
282}
283
284// parseShardData parses binary shard data into builder
285func (dim *DIDIndexManager) parseShardData(data []byte, builder *ShardBuilder) error {
286 if len(data) < 32 {
287 return nil
288 }
289
290 entryCount := binary.LittleEndian.Uint32(data[9:13]) // Fixed offset
291 offset := 32
292
293 for i := 0; i < int(entryCount); i++ {
294 if offset+DID_IDENTIFIER_LEN+2 > len(data) {
295 break
296 }
297
298 // Read identifier
299 identifier := string(data[offset : offset+DID_IDENTIFIER_LEN])
300 offset += DID_IDENTIFIER_LEN
301
302 // Read location count
303 locCount := binary.LittleEndian.Uint16(data[offset : offset+2])
304 offset += 2
305
306 // Read locations
307 locations := make([]OpLocation, locCount)
308 for j := 0; j < int(locCount); j++ {
309 if offset+5 > len(data) {
310 break
311 }
312
313 locations[j] = OpLocation{
314 Bundle: binary.LittleEndian.Uint16(data[offset : offset+2]),
315 Position: binary.LittleEndian.Uint16(data[offset+2 : offset+4]),
316 Nullified: data[offset+4] != 0,
317 }
318 offset += 5
319 }
320
321 builder.entries[identifier] = locations
322 }
323
324 return nil
325}
326
327// writeShard writes a shard to disk in binary format
328func (dim *DIDIndexManager) writeShard(shardNum uint8, builder *ShardBuilder) error {
329 // Sort identifiers for binary search
330 identifiers := make([]string, 0, len(builder.entries))
331 for id := range builder.entries {
332 identifiers = append(identifiers, id)
333 }
334 sort.Strings(identifiers)
335
336 // Calculate size
337 totalSize := 32 // Header
338 for _, id := range identifiers {
339 locations := builder.entries[id]
340 totalSize += DID_IDENTIFIER_LEN + 2 + (len(locations) * 5)
341 }
342
343 // Allocate buffer
344 buf := make([]byte, totalSize)
345
346 // Write header
347 copy(buf[0:4], DIDINDEX_MAGIC)
348 binary.LittleEndian.PutUint32(buf[4:8], DIDINDEX_VERSION)
349 buf[8] = shardNum
350 binary.LittleEndian.PutUint32(buf[9:13], uint32(len(identifiers)))
351 // Reserved bytes 13-32 stay zero
352
353 // Write entries
354 offset := 32
355 for _, identifier := range identifiers {
356 locations := builder.entries[identifier]
357
358 // Write identifier (24 bytes)
359 copy(buf[offset:offset+DID_IDENTIFIER_LEN], identifier)
360 offset += DID_IDENTIFIER_LEN
361
362 // Write location count
363 binary.LittleEndian.PutUint16(buf[offset:offset+2], uint16(len(locations)))
364 offset += 2
365
366 // Write locations
367 for _, loc := range locations {
368 binary.LittleEndian.PutUint16(buf[offset:offset+2], loc.Bundle)
369 binary.LittleEndian.PutUint16(buf[offset+2:offset+4], loc.Position)
370
371 // Write nullified flag
372 if loc.Nullified {
373 buf[offset+4] = 1
374 } else {
375 buf[offset+4] = 0
376 }
377
378 offset += 5
379 }
380 }
381
382 // Write atomically (tmp → rename)
383 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
384 tempPath := shardPath + ".tmp"
385
386 if err := os.WriteFile(tempPath, buf, 0644); err != nil {
387 return err
388 }
389
390 if err := os.Rename(tempPath, shardPath); err != nil {
391 os.Remove(tempPath)
392 return err
393 }
394
395 // Invalidate cache for this shard
396 dim.invalidateShard(shardNum)
397
398 return nil
399}
400
401// invalidateShard removes a shard from cache
402func (dim *DIDIndexManager) invalidateShard(shardNum uint8) {
403 dim.cacheMu.Lock()
404 defer dim.cacheMu.Unlock()
405
406 if cached, exists := dim.shardCache[shardNum]; exists {
407 dim.unmapShard(cached)
408 delete(dim.shardCache, shardNum)
409
410 // Remove from LRU order
411 for i, s := range dim.cacheOrder {
412 if s == shardNum {
413 dim.cacheOrder = append(dim.cacheOrder[:i], dim.cacheOrder[i+1:]...)
414 break
415 }
416 }
417 }
418}