+549
bundle/did_index.go
+549
bundle/did_index.go
···
1
+
package bundle
2
+
3
+
import (
4
+
"encoding/binary"
5
+
"fmt"
6
+
"hash/fnv"
7
+
"os"
8
+
"path/filepath"
9
+
"sort"
10
+
"sync"
11
+
"syscall"
12
+
"time"
13
+
14
+
"github.com/goccy/go-json"
15
+
"tangled.org/atscan.net/plcbundle/plc"
16
+
)
17
+
18
+
const (
19
+
DID_INDEX_DIR = ".plcbundle"
20
+
DID_INDEX_SHARDS = "shards"
21
+
DID_INDEX_CONFIG = "config.json"
22
+
DID_SHARD_COUNT = 256
23
+
DID_PREFIX = "did:plc:"
24
+
DID_IDENTIFIER_LEN = 24 // Without "did:plc:" prefix
25
+
26
+
// Binary format constants (renamed to avoid conflict)
27
+
DIDINDEX_MAGIC = "PLCD"
28
+
DIDINDEX_VERSION = 1
29
+
)
30
+
31
+
// DIDIndexManager manages sharded DID position indexes with mmap
32
+
type DIDIndexManager struct {
33
+
baseDir string
34
+
indexDir string
35
+
shardDir string
36
+
configPath string
37
+
38
+
// LRU cache for hot shards
39
+
shardCache map[uint8]*mmapShard
40
+
cacheOrder []uint8 // For LRU
41
+
maxCache int
42
+
cacheMu sync.RWMutex
43
+
44
+
config *DIDIndexConfig
45
+
logger Logger
46
+
verbose bool
47
+
}
48
+
49
+
// mmapShard represents a memory-mapped shard file
50
+
type mmapShard struct {
51
+
shardNum uint8
52
+
data []byte
53
+
file *os.File
54
+
lastUsed time.Time
55
+
}
56
+
57
+
// DIDIndexConfig stores index metadata
58
+
type DIDIndexConfig struct {
59
+
Version int `json:"version"`
60
+
Format string `json:"format"`
61
+
ShardCount int `json:"shard_count"`
62
+
TotalDIDs int64 `json:"total_dids"`
63
+
UpdatedAt time.Time `json:"updated_at"`
64
+
LastBundle int `json:"last_bundle"`
65
+
}
66
+
67
+
// OpLocation represents exact location of an operation
68
+
type OpLocation struct {
69
+
Bundle uint16
70
+
Position uint16
71
+
Nullified bool
72
+
}
73
+
74
+
// NewDIDIndexManager creates a new DID index manager
75
+
func NewDIDIndexManager(baseDir string, logger Logger) *DIDIndexManager {
76
+
indexDir := filepath.Join(baseDir, DID_INDEX_DIR)
77
+
shardDir := filepath.Join(indexDir, DID_INDEX_SHARDS)
78
+
configPath := filepath.Join(indexDir, DID_INDEX_CONFIG)
79
+
80
+
// Ensure directories exist
81
+
os.MkdirAll(shardDir, 0755)
82
+
83
+
// Load or create config
84
+
config, _ := loadIndexConfig(configPath)
85
+
if config == nil {
86
+
config = &DIDIndexConfig{
87
+
Version: DIDINDEX_VERSION,
88
+
Format: "binary_v1",
89
+
ShardCount: DID_SHARD_COUNT,
90
+
UpdatedAt: time.Now().UTC(),
91
+
}
92
+
}
93
+
94
+
return &DIDIndexManager{
95
+
baseDir: baseDir,
96
+
indexDir: indexDir,
97
+
shardDir: shardDir,
98
+
configPath: configPath,
99
+
shardCache: make(map[uint8]*mmapShard),
100
+
cacheOrder: make([]uint8, 0),
101
+
maxCache: 20, // Keep 20 hot shards in memory (~120 MB)
102
+
config: config,
103
+
logger: logger,
104
+
}
105
+
}
106
+
107
+
// Close unmaps all shards and cleans up
108
+
func (dim *DIDIndexManager) Close() error {
109
+
dim.cacheMu.Lock()
110
+
defer dim.cacheMu.Unlock()
111
+
112
+
for _, shard := range dim.shardCache {
113
+
dim.unmapShard(shard)
114
+
}
115
+
116
+
dim.shardCache = make(map[uint8]*mmapShard)
117
+
dim.cacheOrder = make([]uint8, 0)
118
+
119
+
return nil
120
+
}
121
+
122
+
func (dim *DIDIndexManager) SetVerbose(verbose bool) {
123
+
dim.verbose = verbose
124
+
}
125
+
126
+
// GetDIDLocations returns all bundle+position locations for a DID
127
+
func (dim *DIDIndexManager) GetDIDLocations(did string) ([]OpLocation, error) {
128
+
// Validate and extract identifier
129
+
identifier, err := extractDIDIdentifier(did)
130
+
if err != nil {
131
+
return nil, err
132
+
}
133
+
134
+
// Calculate shard number
135
+
shardNum := dim.calculateShard(identifier)
136
+
if dim.verbose {
137
+
dim.logger.Printf("DEBUG: DID %s -> identifier '%s' -> shard %02x", did, identifier, shardNum)
138
+
}
139
+
140
+
// Load shard
141
+
shard, err := dim.loadShard(shardNum)
142
+
if err != nil {
143
+
if dim.verbose {
144
+
dim.logger.Printf("DEBUG: Failed to load shard: %v", err)
145
+
}
146
+
return nil, fmt.Errorf("failed to load shard %02x: %w", shardNum, err)
147
+
}
148
+
149
+
if shard.data == nil {
150
+
if dim.verbose {
151
+
dim.logger.Printf("DEBUG: Shard %02x has no data (empty shard)", shardNum)
152
+
}
153
+
return nil, nil
154
+
}
155
+
156
+
if dim.verbose {
157
+
dim.logger.Printf("DEBUG: Shard %02x loaded, size: %d bytes", shardNum, len(shard.data))
158
+
}
159
+
160
+
// Binary search
161
+
locations := dim.searchShard(shard, identifier)
162
+
if dim.verbose {
163
+
dim.logger.Printf("DEBUG: Binary search found %d locations", len(locations))
164
+
if len(locations) > 0 {
165
+
dim.logger.Printf("DEBUG: Locations: %v", locations)
166
+
}
167
+
}
168
+
169
+
return locations, nil
170
+
}
171
+
172
+
// calculateShard determines which shard a DID belongs to
173
+
func (dim *DIDIndexManager) calculateShard(identifier string) uint8 {
174
+
h := fnv.New32a()
175
+
h.Write([]byte(identifier))
176
+
hash := h.Sum32()
177
+
return uint8(hash % DID_SHARD_COUNT)
178
+
}
179
+
180
+
// loadShard loads a shard from cache or disk (with mmap)
181
+
func (dim *DIDIndexManager) loadShard(shardNum uint8) (*mmapShard, error) {
182
+
dim.cacheMu.Lock()
183
+
defer dim.cacheMu.Unlock()
184
+
185
+
// Check cache
186
+
if shard, exists := dim.shardCache[shardNum]; exists {
187
+
shard.lastUsed = time.Now()
188
+
dim.updateLRU(shardNum)
189
+
return shard, nil
190
+
}
191
+
192
+
// Load from disk
193
+
shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
194
+
195
+
// Check if file exists
196
+
if _, err := os.Stat(shardPath); os.IsNotExist(err) {
197
+
// Shard doesn't exist yet (no DIDs in this shard)
198
+
return &mmapShard{
199
+
shardNum: shardNum,
200
+
data: nil,
201
+
lastUsed: time.Now(),
202
+
}, nil
203
+
}
204
+
205
+
// Open file
206
+
file, err := os.Open(shardPath)
207
+
if err != nil {
208
+
return nil, err
209
+
}
210
+
211
+
// Get file size
212
+
info, err := file.Stat()
213
+
if err != nil {
214
+
file.Close()
215
+
return nil, err
216
+
}
217
+
218
+
if info.Size() == 0 {
219
+
file.Close()
220
+
return &mmapShard{
221
+
shardNum: shardNum,
222
+
data: nil,
223
+
lastUsed: time.Now(),
224
+
}, nil
225
+
}
226
+
227
+
// Memory-map the file
228
+
data, err := syscall.Mmap(int(file.Fd()), 0, int(info.Size()),
229
+
syscall.PROT_READ, syscall.MAP_SHARED)
230
+
if err != nil {
231
+
file.Close()
232
+
return nil, fmt.Errorf("mmap failed: %w", err)
233
+
}
234
+
235
+
shard := &mmapShard{
236
+
shardNum: shardNum,
237
+
data: data,
238
+
file: file,
239
+
lastUsed: time.Now(),
240
+
}
241
+
242
+
// Add to cache
243
+
dim.shardCache[shardNum] = shard
244
+
dim.cacheOrder = append(dim.cacheOrder, shardNum)
245
+
246
+
// Evict if cache is full
247
+
if len(dim.shardCache) > dim.maxCache {
248
+
dim.evictLRU()
249
+
}
250
+
251
+
return shard, nil
252
+
}
253
+
254
+
// searchShard performs binary search in a memory-mapped shard
255
+
func (dim *DIDIndexManager) searchShard(shard *mmapShard, identifier string) []OpLocation {
256
+
if shard.data == nil || len(shard.data) < 32 {
257
+
return nil
258
+
}
259
+
260
+
data := shard.data
261
+
262
+
// Read header
263
+
if string(data[0:4]) != DIDINDEX_MAGIC {
264
+
dim.logger.Printf("Warning: invalid shard magic")
265
+
return nil
266
+
}
267
+
268
+
entryCount := binary.LittleEndian.Uint32(data[9:13])
269
+
if entryCount == 0 {
270
+
return nil
271
+
}
272
+
273
+
if dim.verbose {
274
+
dim.logger.Printf("DEBUG: Searching %d entries for '%s'", entryCount, identifier)
275
+
}
276
+
277
+
// Binary search in entries
278
+
left, right := 0, int(entryCount)
279
+
280
+
// Track for debugging
281
+
attempts := 0
282
+
283
+
for left < right {
284
+
attempts++
285
+
mid := (left + right) / 2
286
+
287
+
// Calculate offset for this entry
288
+
entryOffset := dim.getEntryOffset(data, 32, mid)
289
+
if entryOffset < 0 {
290
+
if dim.verbose {
291
+
dim.logger.Printf("DEBUG: Invalid entry offset at mid=%d", mid)
292
+
}
293
+
return nil
294
+
}
295
+
296
+
// Read identifier at this position
297
+
if entryOffset+DID_IDENTIFIER_LEN > len(data) {
298
+
if dim.verbose {
299
+
dim.logger.Printf("DEBUG: Entry offset out of bounds: %d + %d > %d",
300
+
entryOffset, DID_IDENTIFIER_LEN, len(data))
301
+
}
302
+
return nil
303
+
}
304
+
305
+
entryID := string(data[entryOffset : entryOffset+DID_IDENTIFIER_LEN])
306
+
307
+
if dim.verbose && attempts <= 5 {
308
+
dim.logger.Printf("DEBUG: Attempt %d: mid=%d, comparing '%s' vs '%s'",
309
+
attempts, mid, identifier, entryID)
310
+
}
311
+
312
+
// Compare
313
+
cmp := 0
314
+
if identifier < entryID {
315
+
cmp = -1
316
+
} else if identifier > entryID {
317
+
cmp = 1
318
+
}
319
+
320
+
if cmp == 0 {
321
+
// Found!
322
+
if dim.verbose {
323
+
dim.logger.Printf("DEBUG: FOUND at mid=%d after %d attempts", mid, attempts)
324
+
}
325
+
return dim.readLocations(data, entryOffset)
326
+
} else if cmp < 0 {
327
+
right = mid
328
+
} else {
329
+
left = mid + 1
330
+
}
331
+
}
332
+
333
+
if dim.verbose {
334
+
dim.logger.Printf("DEBUG: NOT FOUND after %d attempts (left=%d, right=%d)",
335
+
attempts, left, right)
336
+
}
337
+
338
+
return nil // Not found
339
+
}
340
+
341
+
// getEntryOffset calculates byte offset for nth entry
342
+
func (dim *DIDIndexManager) getEntryOffset(data []byte, baseOffset int, entryIndex int) int {
343
+
offset := baseOffset
344
+
345
+
// Skip to the nth entry
346
+
for i := 0; i < entryIndex; i++ {
347
+
if offset+DID_IDENTIFIER_LEN+2 > len(data) {
348
+
return -1
349
+
}
350
+
351
+
// Skip identifier (24 bytes)
352
+
offset += DID_IDENTIFIER_LEN
353
+
354
+
// Read location count
355
+
count := binary.LittleEndian.Uint16(data[offset : offset+2])
356
+
offset += 2
357
+
358
+
// Skip locations (5 bytes each)
359
+
offset += int(count) * 5
360
+
}
361
+
362
+
return offset
363
+
}
364
+
365
+
// readLocations reads location data at given offset
366
+
func (dim *DIDIndexManager) readLocations(data []byte, offset int) []OpLocation {
367
+
// Skip identifier
368
+
offset += DID_IDENTIFIER_LEN
369
+
370
+
// Read count
371
+
if offset+2 > len(data) {
372
+
return nil
373
+
}
374
+
count := binary.LittleEndian.Uint16(data[offset : offset+2])
375
+
offset += 2
376
+
377
+
// Read locations
378
+
locations := make([]OpLocation, count)
379
+
for i := 0; i < int(count); i++ {
380
+
if offset+5 > len(data) {
381
+
return locations[:i]
382
+
}
383
+
384
+
bundle := binary.LittleEndian.Uint16(data[offset : offset+2])
385
+
position := binary.LittleEndian.Uint16(data[offset+2 : offset+4])
386
+
nullified := data[offset+4] != 0
387
+
388
+
locations[i] = OpLocation{
389
+
Bundle: bundle,
390
+
Position: position,
391
+
Nullified: nullified,
392
+
}
393
+
394
+
offset += 5
395
+
}
396
+
397
+
return locations
398
+
}
399
+
400
+
// updateLRU updates LRU order
401
+
func (dim *DIDIndexManager) updateLRU(shardNum uint8) {
402
+
// Remove from current position
403
+
for i, s := range dim.cacheOrder {
404
+
if s == shardNum {
405
+
dim.cacheOrder = append(dim.cacheOrder[:i], dim.cacheOrder[i+1:]...)
406
+
break
407
+
}
408
+
}
409
+
410
+
// Add to end (most recent)
411
+
dim.cacheOrder = append(dim.cacheOrder, shardNum)
412
+
}
413
+
414
+
// evictLRU removes least recently used shard
415
+
func (dim *DIDIndexManager) evictLRU() {
416
+
if len(dim.cacheOrder) == 0 {
417
+
return
418
+
}
419
+
420
+
// Remove oldest
421
+
victimNum := dim.cacheOrder[0]
422
+
dim.cacheOrder = dim.cacheOrder[1:]
423
+
424
+
if victim, exists := dim.shardCache[victimNum]; exists {
425
+
dim.unmapShard(victim)
426
+
delete(dim.shardCache, victimNum)
427
+
}
428
+
}
429
+
430
+
// unmapShard unmaps and closes a shard
431
+
func (dim *DIDIndexManager) unmapShard(shard *mmapShard) {
432
+
if shard.data != nil {
433
+
syscall.Munmap(shard.data)
434
+
}
435
+
if shard.file != nil {
436
+
shard.file.Close()
437
+
}
438
+
}
439
+
440
+
// GetStats returns index statistics
441
+
func (dim *DIDIndexManager) GetStats() map[string]interface{} {
442
+
dim.cacheMu.RLock()
443
+
defer dim.cacheMu.RUnlock()
444
+
445
+
cachedShards := make([]int, 0)
446
+
for num := range dim.shardCache {
447
+
cachedShards = append(cachedShards, int(num))
448
+
}
449
+
sort.Ints(cachedShards)
450
+
451
+
return map[string]interface{}{
452
+
"total_dids": dim.config.TotalDIDs,
453
+
"last_bundle": dim.config.LastBundle,
454
+
"shard_count": dim.config.ShardCount,
455
+
"cached_shards": len(dim.shardCache),
456
+
"cache_limit": dim.maxCache,
457
+
"cache_order": cachedShards,
458
+
"updated_at": dim.config.UpdatedAt,
459
+
}
460
+
}
461
+
462
+
// Exists checks if index exists
463
+
func (dim *DIDIndexManager) Exists() bool {
464
+
_, err := os.Stat(dim.configPath)
465
+
return err == nil
466
+
}
467
+
468
+
// extractDIDIdentifier extracts the 24-char identifier from full DID
469
+
func extractDIDIdentifier(did string) (string, error) {
470
+
if err := plc.ValidateDIDFormat(did); err != nil {
471
+
return "", err
472
+
}
473
+
474
+
// Remove "did:plc:" prefix
475
+
identifier := did[8:] // Skip "did:plc:"
476
+
477
+
if len(identifier) != DID_IDENTIFIER_LEN {
478
+
return "", fmt.Errorf("invalid identifier length: %d", len(identifier))
479
+
}
480
+
481
+
return identifier, nil
482
+
}
483
+
484
+
// loadIndexConfig loads index configuration
485
+
func loadIndexConfig(path string) (*DIDIndexConfig, error) {
486
+
data, err := os.ReadFile(path)
487
+
if err != nil {
488
+
return nil, err
489
+
}
490
+
491
+
var config DIDIndexConfig
492
+
if err := json.Unmarshal(data, &config); err != nil {
493
+
return nil, err
494
+
}
495
+
496
+
return &config, nil
497
+
}
498
+
499
+
// saveIndexConfig saves index configuration
500
+
func (dim *DIDIndexManager) saveIndexConfig() error {
501
+
dim.config.UpdatedAt = time.Now().UTC()
502
+
503
+
data, err := json.MarshalIndent(dim.config, "", " ")
504
+
if err != nil {
505
+
return err
506
+
}
507
+
508
+
return os.WriteFile(dim.configPath, data, 0644)
509
+
}
510
+
511
+
// Add this debug command to verify shard integrity
512
+
513
+
func (dim *DIDIndexManager) DebugShard(shardNum uint8) error {
514
+
shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
515
+
data, err := os.ReadFile(shardPath)
516
+
if err != nil {
517
+
return err
518
+
}
519
+
520
+
fmt.Printf("Shard %02x debug:\n", shardNum)
521
+
fmt.Printf(" File size: %d bytes\n", len(data))
522
+
fmt.Printf(" Magic: %s\n", string(data[0:4]))
523
+
fmt.Printf(" Version: %d\n", binary.LittleEndian.Uint32(data[4:8]))
524
+
fmt.Printf(" Shard num: %d\n", data[8])
525
+
fmt.Printf(" Entry count: %d\n", binary.LittleEndian.Uint32(data[9:13]))
526
+
527
+
// Show first few entries
528
+
entryCount := binary.LittleEndian.Uint32(data[9:13])
529
+
fmt.Printf("\n First 5 entries:\n")
530
+
531
+
offset := 32
532
+
for i := 0; i < 5 && i < int(entryCount); i++ {
533
+
if offset+DID_IDENTIFIER_LEN+2 > len(data) {
534
+
break
535
+
}
536
+
537
+
identifier := string(data[offset : offset+DID_IDENTIFIER_LEN])
538
+
offset += DID_IDENTIFIER_LEN
539
+
540
+
locCount := binary.LittleEndian.Uint16(data[offset : offset+2])
541
+
offset += 2
542
+
543
+
fmt.Printf(" %d. '%s' (%d locations)\n", i+1, identifier, locCount)
544
+
545
+
offset += int(locCount) * 4 // Skip locations
546
+
}
547
+
548
+
return nil
549
+
}
+418
bundle/did_index_builder.go
+418
bundle/did_index_builder.go
···
1
+
package bundle
2
+
3
+
import (
4
+
"context"
5
+
"encoding/binary"
6
+
"fmt"
7
+
"os"
8
+
"path/filepath"
9
+
"sort"
10
+
"sync"
11
+
)
12
+
13
+
const (
14
+
BUILD_BATCH_SIZE = 100 // Process 100 bundles at a time (not used in streaming approach)
15
+
)
16
+
17
+
// ShardBuilder accumulates DID positions for a shard
18
+
type ShardBuilder struct {
19
+
entries map[string][]OpLocation
20
+
mu sync.Mutex
21
+
}
22
+
23
+
// newShardBuilder creates a new shard builder
24
+
func newShardBuilder() *ShardBuilder {
25
+
return &ShardBuilder{
26
+
entries: make(map[string][]OpLocation),
27
+
}
28
+
}
29
+
30
+
// add adds a location to the shard
31
+
func (sb *ShardBuilder) add(identifier string, bundle uint16, position uint16, nullified bool) {
32
+
sb.mu.Lock()
33
+
defer sb.mu.Unlock()
34
+
35
+
sb.entries[identifier] = append(sb.entries[identifier], OpLocation{
36
+
Bundle: bundle,
37
+
Position: position,
38
+
Nullified: nullified,
39
+
})
40
+
}
41
+
42
+
// BuildIndexFromScratch builds index with controlled memory usage
43
+
func (dim *DIDIndexManager) BuildIndexFromScratch(ctx context.Context, mgr *Manager, progressCallback func(current, total int)) error {
44
+
dim.logger.Printf("Building DID index from scratch (memory-efficient mode)...")
45
+
46
+
bundles := mgr.index.GetBundles()
47
+
if len(bundles) == 0 {
48
+
return fmt.Errorf("no bundles to index")
49
+
}
50
+
51
+
// Create temporary shard files
52
+
tempShards := make([]*os.File, DID_SHARD_COUNT)
53
+
for i := 0; i < DID_SHARD_COUNT; i++ {
54
+
tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", i))
55
+
f, err := os.Create(tempPath)
56
+
if err != nil {
57
+
for j := 0; j < i; j++ {
58
+
tempShards[j].Close()
59
+
os.Remove(filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", j)))
60
+
}
61
+
return fmt.Errorf("failed to create temp shard: %w", err)
62
+
}
63
+
tempShards[i] = f
64
+
}
65
+
66
+
dim.logger.Printf("Pass 1/2: Scanning %d bundles...", len(bundles))
67
+
68
+
// Stream all operations to temp files
69
+
for i, meta := range bundles {
70
+
select {
71
+
case <-ctx.Done():
72
+
for _, f := range tempShards {
73
+
f.Close()
74
+
os.Remove(f.Name())
75
+
}
76
+
return ctx.Err()
77
+
default:
78
+
}
79
+
80
+
if progressCallback != nil {
81
+
progressCallback(i+1, len(bundles))
82
+
}
83
+
84
+
// Load bundle
85
+
bundle, err := mgr.LoadBundle(ctx, meta.BundleNumber)
86
+
if err != nil {
87
+
dim.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
88
+
continue
89
+
}
90
+
91
+
// Process each operation
92
+
for pos, op := range bundle.Operations {
93
+
identifier, err := extractDIDIdentifier(op.DID)
94
+
if err != nil {
95
+
continue
96
+
}
97
+
98
+
shardNum := dim.calculateShard(identifier)
99
+
100
+
// Write entry: [24 bytes ID][2 bytes bundle][2 bytes pos][1 byte nullified]
101
+
entry := make([]byte, 29)
102
+
copy(entry[0:24], identifier)
103
+
binary.LittleEndian.PutUint16(entry[24:26], uint16(meta.BundleNumber))
104
+
binary.LittleEndian.PutUint16(entry[26:28], uint16(pos))
105
+
106
+
// Store nullified flag
107
+
if op.IsNullified() {
108
+
entry[28] = 1
109
+
} else {
110
+
entry[28] = 0
111
+
}
112
+
113
+
if _, err := tempShards[shardNum].Write(entry); err != nil {
114
+
dim.logger.Printf("Warning: failed to write to temp shard %02x: %v", shardNum, err)
115
+
}
116
+
}
117
+
}
118
+
119
+
// Close temp files
120
+
for _, f := range tempShards {
121
+
f.Close()
122
+
}
123
+
124
+
dim.logger.Printf("\n")
125
+
dim.logger.Printf("Pass 2/2: Consolidating %d shards...", DID_SHARD_COUNT)
126
+
127
+
// Consolidate shards
128
+
totalDIDs := int64(0)
129
+
for i := 0; i < DID_SHARD_COUNT; i++ {
130
+
// Log every 32 shards
131
+
if i%32 == 0 || i == DID_SHARD_COUNT-1 {
132
+
dim.logger.Printf(" Consolidating shards: %d/%d (%.1f%%)",
133
+
i+1, DID_SHARD_COUNT, float64(i+1)/float64(DID_SHARD_COUNT)*100)
134
+
}
135
+
136
+
count, err := dim.consolidateShard(uint8(i))
137
+
if err != nil {
138
+
return fmt.Errorf("failed to consolidate shard %02x: %w", i, err)
139
+
}
140
+
totalDIDs += count
141
+
}
142
+
143
+
dim.config.TotalDIDs = totalDIDs
144
+
dim.config.LastBundle = bundles[len(bundles)-1].BundleNumber
145
+
146
+
if err := dim.saveIndexConfig(); err != nil {
147
+
return fmt.Errorf("failed to save config: %w", err)
148
+
}
149
+
150
+
dim.logger.Printf("✓ Index built: %d DIDs indexed", totalDIDs)
151
+
152
+
return nil
153
+
}
154
+
155
+
// consolidateShard reads temp file, sorts, and writes final shard
156
+
func (dim *DIDIndexManager) consolidateShard(shardNum uint8) (int64, error) {
157
+
tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", shardNum))
158
+
159
+
// Read all entries from temp file
160
+
data, err := os.ReadFile(tempPath)
161
+
if err != nil {
162
+
if os.IsNotExist(err) {
163
+
return 0, nil // No entries for this shard
164
+
}
165
+
return 0, err
166
+
}
167
+
168
+
if len(data) == 0 {
169
+
os.Remove(tempPath)
170
+
return 0, nil
171
+
}
172
+
173
+
// Parse entries (29 bytes each)
174
+
entryCount := len(data) / 29
175
+
if len(data)%29 != 0 {
176
+
return 0, fmt.Errorf("corrupted temp shard: size not multiple of 29")
177
+
}
178
+
179
+
type tempEntry struct {
180
+
identifier string
181
+
bundle uint16
182
+
position uint16
183
+
nullified bool
184
+
}
185
+
186
+
entries := make([]tempEntry, entryCount)
187
+
for i := 0; i < entryCount; i++ {
188
+
offset := i * 29
189
+
entries[i] = tempEntry{
190
+
identifier: string(data[offset : offset+24]),
191
+
bundle: binary.LittleEndian.Uint16(data[offset+24 : offset+26]),
192
+
position: binary.LittleEndian.Uint16(data[offset+26 : offset+28]),
193
+
nullified: data[offset+28] != 0,
194
+
}
195
+
}
196
+
197
+
// Free the data slice
198
+
data = nil
199
+
200
+
// Sort by identifier
201
+
sort.Slice(entries, func(i, j int) bool {
202
+
return entries[i].identifier < entries[j].identifier
203
+
})
204
+
205
+
// Group by DID
206
+
builder := newShardBuilder()
207
+
for _, entry := range entries {
208
+
builder.add(entry.identifier, entry.bundle, entry.position, entry.nullified)
209
+
}
210
+
211
+
// Free entries
212
+
entries = nil
213
+
214
+
// Write final shard
215
+
if err := dim.writeShard(shardNum, builder); err != nil {
216
+
return 0, err
217
+
}
218
+
219
+
// Clean up temp file
220
+
os.Remove(tempPath)
221
+
222
+
return int64(len(builder.entries)), nil
223
+
}
224
+
225
+
// UpdateIndexForBundle adds operations from a new bundle (incremental)
226
+
func (dim *DIDIndexManager) UpdateIndexForBundle(ctx context.Context, bundle *Bundle) error {
227
+
// Group operations by shard
228
+
shardOps := make(map[uint8]map[string][]OpLocation)
229
+
230
+
for pos, op := range bundle.Operations {
231
+
identifier, err := extractDIDIdentifier(op.DID)
232
+
if err != nil {
233
+
continue
234
+
}
235
+
236
+
shardNum := dim.calculateShard(identifier)
237
+
238
+
if shardOps[shardNum] == nil {
239
+
shardOps[shardNum] = make(map[string][]OpLocation)
240
+
}
241
+
242
+
shardOps[shardNum][identifier] = append(shardOps[shardNum][identifier], OpLocation{
243
+
Bundle: uint16(bundle.BundleNumber),
244
+
Position: uint16(pos),
245
+
Nullified: op.IsNullified(),
246
+
})
247
+
}
248
+
249
+
// Update affected shards (one at a time to save memory)
250
+
for shardNum, newOps := range shardOps {
251
+
if err := dim.updateShard(shardNum, newOps); err != nil {
252
+
return fmt.Errorf("failed to update shard %02x: %w", shardNum, err)
253
+
}
254
+
}
255
+
256
+
// Update config
257
+
dim.config.LastBundle = bundle.BundleNumber
258
+
return dim.saveIndexConfig()
259
+
}
260
+
261
+
// updateShard updates a single shard with new operations
262
+
func (dim *DIDIndexManager) updateShard(shardNum uint8, newOps map[string][]OpLocation) error {
263
+
// Load existing shard data
264
+
shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
265
+
266
+
existingBuilder := newShardBuilder()
267
+
268
+
// Read existing shard if it exists
269
+
if data, err := os.ReadFile(shardPath); err == nil && len(data) > 32 {
270
+
if err := dim.parseShardData(data, existingBuilder); err != nil {
271
+
return err
272
+
}
273
+
}
274
+
275
+
// Merge new operations
276
+
for identifier, locations := range newOps {
277
+
existingBuilder.entries[identifier] = append(existingBuilder.entries[identifier], locations...)
278
+
}
279
+
280
+
// Write updated shard
281
+
return dim.writeShard(shardNum, existingBuilder)
282
+
}
283
+
284
+
// parseShardData parses binary shard data into builder
285
+
func (dim *DIDIndexManager) parseShardData(data []byte, builder *ShardBuilder) error {
286
+
if len(data) < 32 {
287
+
return nil
288
+
}
289
+
290
+
entryCount := binary.LittleEndian.Uint32(data[9:13]) // Fixed offset
291
+
offset := 32
292
+
293
+
for i := 0; i < int(entryCount); i++ {
294
+
if offset+DID_IDENTIFIER_LEN+2 > len(data) {
295
+
break
296
+
}
297
+
298
+
// Read identifier
299
+
identifier := string(data[offset : offset+DID_IDENTIFIER_LEN])
300
+
offset += DID_IDENTIFIER_LEN
301
+
302
+
// Read location count
303
+
locCount := binary.LittleEndian.Uint16(data[offset : offset+2])
304
+
offset += 2
305
+
306
+
// Read locations
307
+
locations := make([]OpLocation, locCount)
308
+
for j := 0; j < int(locCount); j++ {
309
+
if offset+5 > len(data) {
310
+
break
311
+
}
312
+
313
+
locations[j] = OpLocation{
314
+
Bundle: binary.LittleEndian.Uint16(data[offset : offset+2]),
315
+
Position: binary.LittleEndian.Uint16(data[offset+2 : offset+4]),
316
+
Nullified: data[offset+4] != 0,
317
+
}
318
+
offset += 5
319
+
}
320
+
321
+
builder.entries[identifier] = locations
322
+
}
323
+
324
+
return nil
325
+
}
326
+
327
+
// writeShard writes a shard to disk in binary format
328
+
func (dim *DIDIndexManager) writeShard(shardNum uint8, builder *ShardBuilder) error {
329
+
// Sort identifiers for binary search
330
+
identifiers := make([]string, 0, len(builder.entries))
331
+
for id := range builder.entries {
332
+
identifiers = append(identifiers, id)
333
+
}
334
+
sort.Strings(identifiers)
335
+
336
+
// Calculate size
337
+
totalSize := 32 // Header
338
+
for _, id := range identifiers {
339
+
locations := builder.entries[id]
340
+
totalSize += DID_IDENTIFIER_LEN + 2 + (len(locations) * 5)
341
+
}
342
+
343
+
// Allocate buffer
344
+
buf := make([]byte, totalSize)
345
+
346
+
// Write header
347
+
copy(buf[0:4], DIDINDEX_MAGIC)
348
+
binary.LittleEndian.PutUint32(buf[4:8], DIDINDEX_VERSION)
349
+
buf[8] = shardNum
350
+
binary.LittleEndian.PutUint32(buf[9:13], uint32(len(identifiers)))
351
+
// Reserved bytes 13-32 stay zero
352
+
353
+
// Write entries
354
+
offset := 32
355
+
for _, identifier := range identifiers {
356
+
locations := builder.entries[identifier]
357
+
358
+
// Write identifier (24 bytes)
359
+
copy(buf[offset:offset+DID_IDENTIFIER_LEN], identifier)
360
+
offset += DID_IDENTIFIER_LEN
361
+
362
+
// Write location count
363
+
binary.LittleEndian.PutUint16(buf[offset:offset+2], uint16(len(locations)))
364
+
offset += 2
365
+
366
+
// Write locations
367
+
for _, loc := range locations {
368
+
binary.LittleEndian.PutUint16(buf[offset:offset+2], loc.Bundle)
369
+
binary.LittleEndian.PutUint16(buf[offset+2:offset+4], loc.Position)
370
+
371
+
// Write nullified flag
372
+
if loc.Nullified {
373
+
buf[offset+4] = 1
374
+
} else {
375
+
buf[offset+4] = 0
376
+
}
377
+
378
+
offset += 5
379
+
}
380
+
}
381
+
382
+
// Write atomically (tmp → rename)
383
+
shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
384
+
tempPath := shardPath + ".tmp"
385
+
386
+
if err := os.WriteFile(tempPath, buf, 0644); err != nil {
387
+
return err
388
+
}
389
+
390
+
if err := os.Rename(tempPath, shardPath); err != nil {
391
+
os.Remove(tempPath)
392
+
return err
393
+
}
394
+
395
+
// Invalidate cache for this shard
396
+
dim.invalidateShard(shardNum)
397
+
398
+
return nil
399
+
}
400
+
401
+
// invalidateShard removes a shard from cache
402
+
func (dim *DIDIndexManager) invalidateShard(shardNum uint8) {
403
+
dim.cacheMu.Lock()
404
+
defer dim.cacheMu.Unlock()
405
+
406
+
if cached, exists := dim.shardCache[shardNum]; exists {
407
+
dim.unmapShard(cached)
408
+
delete(dim.shardCache, shardNum)
409
+
410
+
// Remove from LRU order
411
+
for i, s := range dim.cacheOrder {
412
+
if s == shardNum {
413
+
dim.cacheOrder = append(dim.cacheOrder[:i], dim.cacheOrder[i+1:]...)
414
+
break
415
+
}
416
+
}
417
+
}
418
+
}
+309
bundle/did_resolver.go
+309
bundle/did_resolver.go
···
1
+
package bundle
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"sort"
7
+
8
+
"tangled.org/atscan.net/plcbundle/plc"
9
+
)
10
+
11
+
// GetDIDOperationsBundledOnly retrieves operations from bundles only (no mempool)
12
+
func (m *Manager) GetDIDOperationsBundledOnly(ctx context.Context, did string, verbose bool) ([]plc.PLCOperation, error) {
13
+
if err := plc.ValidateDIDFormat(did); err != nil {
14
+
return nil, err
15
+
}
16
+
17
+
// Set verbose mode on index
18
+
if m.didIndex != nil {
19
+
m.didIndex.SetVerbose(verbose)
20
+
}
21
+
22
+
// Use index if available
23
+
if m.didIndex != nil && m.didIndex.Exists() {
24
+
if verbose {
25
+
m.logger.Printf("DEBUG: Using DID index for lookup")
26
+
}
27
+
return m.getDIDOperationsIndexed(ctx, did, verbose)
28
+
}
29
+
30
+
// Fallback to full scan
31
+
if verbose {
32
+
m.logger.Printf("DEBUG: DID index not available, using full scan")
33
+
}
34
+
m.logger.Printf("Warning: DID index not available, falling back to full scan")
35
+
return m.getDIDOperationsScan(ctx, did)
36
+
}
37
+
38
+
// GetDIDOperationsFromMempool retrieves operations for a DID from mempool only
39
+
func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plc.PLCOperation, error) {
40
+
if err := plc.ValidateDIDFormat(did); err != nil {
41
+
return nil, err
42
+
}
43
+
44
+
if m.mempool == nil {
45
+
return []plc.PLCOperation{}, nil
46
+
}
47
+
48
+
allMempoolOps, err := m.GetMempoolOperations()
49
+
if err != nil {
50
+
return nil, err
51
+
}
52
+
53
+
var matchingOps []plc.PLCOperation
54
+
for _, op := range allMempoolOps {
55
+
if op.DID == did {
56
+
matchingOps = append(matchingOps, op)
57
+
}
58
+
}
59
+
60
+
return matchingOps, nil
61
+
}
62
+
63
+
// GetDIDOperations retrieves all operations for a DID (bundles + mempool combined)
64
+
func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plc.PLCOperation, error) {
65
+
if err := plc.ValidateDIDFormat(did); err != nil {
66
+
return nil, err
67
+
}
68
+
69
+
// Get bundled operations
70
+
bundledOps, err := m.GetDIDOperationsBundledOnly(ctx, did, verbose)
71
+
if err != nil {
72
+
return nil, err
73
+
}
74
+
75
+
// Get mempool operations
76
+
mempoolOps, err := m.GetDIDOperationsFromMempool(did)
77
+
if err != nil {
78
+
return nil, err
79
+
}
80
+
81
+
if len(mempoolOps) > 0 && verbose {
82
+
m.logger.Printf("DEBUG: Found %d operations in mempool", len(mempoolOps))
83
+
}
84
+
85
+
// Combine and sort
86
+
allOps := append(bundledOps, mempoolOps...)
87
+
88
+
sort.Slice(allOps, func(i, j int) bool {
89
+
return allOps[i].CreatedAt.Before(allOps[j].CreatedAt)
90
+
})
91
+
92
+
return allOps, nil
93
+
}
94
+
95
+
// getDIDOperationsIndexed uses index for fast lookup (PRIVATE - bundles only)
96
+
func (m *Manager) getDIDOperationsIndexed(ctx context.Context, did string, verbose bool) ([]plc.PLCOperation, error) {
97
+
locations, err := m.didIndex.GetDIDLocations(did)
98
+
if err != nil {
99
+
return nil, err
100
+
}
101
+
102
+
if len(locations) == 0 {
103
+
return []plc.PLCOperation{}, nil
104
+
}
105
+
106
+
// Filter nullified at index level (save loading those bundles!)
107
+
var validLocations []OpLocation
108
+
for _, loc := range locations {
109
+
if !loc.Nullified {
110
+
validLocations = append(validLocations, loc)
111
+
}
112
+
}
113
+
114
+
if verbose {
115
+
m.logger.Printf("DEBUG: Filtered %d valid locations (from %d total)",
116
+
len(validLocations), len(locations))
117
+
}
118
+
119
+
// Group by bundle
120
+
bundleMap := make(map[uint16][]uint16)
121
+
for _, loc := range validLocations {
122
+
bundleMap[loc.Bundle] = append(bundleMap[loc.Bundle], loc.Position)
123
+
}
124
+
125
+
if verbose {
126
+
m.logger.Printf("DEBUG: Loading %d bundles", len(bundleMap))
127
+
}
128
+
129
+
// Load operations
130
+
var allOps []plc.PLCOperation
131
+
for bundleNum, positions := range bundleMap {
132
+
bundle, err := m.LoadBundle(ctx, int(bundleNum))
133
+
if err != nil {
134
+
m.logger.Printf("Warning: failed to load bundle %d: %v", bundleNum, err)
135
+
continue
136
+
}
137
+
138
+
for _, pos := range positions {
139
+
if int(pos) < len(bundle.Operations) {
140
+
allOps = append(allOps, bundle.Operations[pos])
141
+
}
142
+
}
143
+
}
144
+
145
+
if verbose {
146
+
m.logger.Printf("DEBUG: Loaded %d total operations", len(allOps))
147
+
}
148
+
149
+
sort.Slice(allOps, func(i, j int) bool {
150
+
return allOps[i].CreatedAt.Before(allOps[j].CreatedAt)
151
+
})
152
+
153
+
return allOps, nil
154
+
}
155
+
156
+
// getDIDOperationsScan falls back to full scan (slow) (PRIVATE - bundles only)
157
+
func (m *Manager) getDIDOperationsScan(ctx context.Context, did string) ([]plc.PLCOperation, error) {
158
+
var allOps []plc.PLCOperation
159
+
bundles := m.index.GetBundles()
160
+
161
+
for _, meta := range bundles {
162
+
select {
163
+
case <-ctx.Done():
164
+
return nil, ctx.Err()
165
+
default:
166
+
}
167
+
168
+
bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
169
+
if err != nil {
170
+
m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
171
+
continue
172
+
}
173
+
174
+
for _, op := range bundle.Operations {
175
+
if op.DID == did {
176
+
allOps = append(allOps, op)
177
+
}
178
+
}
179
+
}
180
+
181
+
sort.Slice(allOps, func(i, j int) bool {
182
+
return allOps[i].CreatedAt.Before(allOps[j].CreatedAt)
183
+
})
184
+
185
+
return allOps, nil
186
+
}
187
+
188
+
// GetLatestDIDOperation returns only the most recent non-nullified operation
189
+
func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plc.PLCOperation, error) {
190
+
if err := plc.ValidateDIDFormat(did); err != nil {
191
+
return nil, err
192
+
}
193
+
194
+
// Check mempool first (most recent operations)
195
+
mempoolOps, err := m.GetDIDOperationsFromMempool(did)
196
+
if err == nil && len(mempoolOps) > 0 {
197
+
// Find latest non-nullified in mempool
198
+
for i := len(mempoolOps) - 1; i >= 0; i-- {
199
+
if !mempoolOps[i].IsNullified() {
200
+
// Found in mempool - but still verify nothing newer in bundles
201
+
// (shouldn't happen with chronological ordering, but be safe)
202
+
bundledLatest, _ := m.getLatestDIDOperationIndexed(ctx, did)
203
+
if bundledLatest == nil || mempoolOps[i].CreatedAt.After(bundledLatest.CreatedAt) {
204
+
return &mempoolOps[i], nil
205
+
}
206
+
return bundledLatest, nil
207
+
}
208
+
}
209
+
}
210
+
211
+
// Not in mempool - use index/scan
212
+
if m.didIndex != nil && m.didIndex.Exists() {
213
+
return m.getLatestDIDOperationIndexed(ctx, did)
214
+
}
215
+
216
+
// Fallback to full lookup
217
+
ops, err := m.GetDIDOperationsBundledOnly(ctx, did, false)
218
+
if err != nil {
219
+
return nil, err
220
+
}
221
+
222
+
// Find latest non-nullified
223
+
for i := len(ops) - 1; i >= 0; i-- {
224
+
if !ops[i].IsNullified() {
225
+
return &ops[i], nil
226
+
}
227
+
}
228
+
229
+
return nil, fmt.Errorf("no valid operations found")
230
+
}
231
+
232
+
// getLatestDIDOperationIndexed uses index to find only the latest operation (PRIVATE)
233
+
func (m *Manager) getLatestDIDOperationIndexed(ctx context.Context, did string) (*plc.PLCOperation, error) {
234
+
// Get all locations from index
235
+
locations, err := m.didIndex.GetDIDLocations(did)
236
+
if err != nil {
237
+
return nil, err
238
+
}
239
+
240
+
if len(locations) == 0 {
241
+
return nil, fmt.Errorf("DID not found")
242
+
}
243
+
244
+
// Filter non-nullified and find the latest
245
+
var latestLoc *OpLocation
246
+
247
+
for i := range locations {
248
+
if locations[i].Nullified {
249
+
continue
250
+
}
251
+
252
+
// Check if this is later than current latest
253
+
if latestLoc == nil {
254
+
latestLoc = &locations[i]
255
+
} else {
256
+
// Compare by bundle, then position
257
+
if locations[i].Bundle > latestLoc.Bundle ||
258
+
(locations[i].Bundle == latestLoc.Bundle && locations[i].Position > latestLoc.Position) {
259
+
latestLoc = &locations[i]
260
+
}
261
+
}
262
+
}
263
+
264
+
if latestLoc == nil {
265
+
return nil, fmt.Errorf("no valid operations found (all nullified)")
266
+
}
267
+
268
+
// ✨ Load ONLY that ONE operation (not the whole bundle!)
269
+
op, err := m.LoadOperation(ctx, int(latestLoc.Bundle), int(latestLoc.Position))
270
+
if err != nil {
271
+
return nil, fmt.Errorf("failed to load operation at bundle %d position %d: %w",
272
+
latestLoc.Bundle, latestLoc.Position, err)
273
+
}
274
+
275
+
return op, nil
276
+
}
277
+
278
+
// BuildDIDIndex builds the complete DID index
279
+
func (m *Manager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error {
280
+
if m.didIndex == nil {
281
+
m.didIndex = NewDIDIndexManager(m.config.BundleDir, m.logger)
282
+
}
283
+
284
+
return m.didIndex.BuildIndexFromScratch(ctx, m, progressCallback)
285
+
}
286
+
287
+
// UpdateDIDIndexForBundle updates index when a new bundle is added
288
+
func (m *Manager) UpdateDIDIndexForBundle(ctx context.Context, bundle *Bundle) error {
289
+
if m.didIndex == nil {
290
+
return nil // Index not initialized
291
+
}
292
+
293
+
return m.didIndex.UpdateIndexForBundle(ctx, bundle)
294
+
}
295
+
296
+
// GetDIDIndexStats returns DID index statistics
297
+
func (m *Manager) GetDIDIndexStats() map[string]interface{} {
298
+
if m.didIndex == nil {
299
+
return map[string]interface{}{
300
+
"enabled": false,
301
+
}
302
+
}
303
+
304
+
stats := m.didIndex.GetStats()
305
+
stats["enabled"] = true
306
+
stats["exists"] = m.didIndex.Exists()
307
+
308
+
return stats
309
+
}
+55
bundle/manager.go
+55
bundle/manager.go
···
37
37
plcClient *plc.Client
38
38
logger Logger
39
39
mempool *Mempool
40
+
didIndex *DIDIndexManager
40
41
}
41
42
42
43
// NewManager creates a new bundle manager
···
259
260
return nil, fmt.Errorf("failed to initialize mempool: %w", err)
260
261
}
261
262
263
+
// Initialize DID index manager
264
+
didIndex := NewDIDIndexManager(config.BundleDir, config.Logger)
265
+
262
266
return &Manager{
263
267
config: config,
264
268
operations: ops,
···
267
271
plcClient: plcClient,
268
272
logger: config.Logger,
269
273
mempool: mempool,
274
+
didIndex: didIndex,
270
275
}, nil
271
276
}
272
277
···
282
287
if err := m.mempool.Save(); err != nil {
283
288
m.logger.Printf("Warning: failed to save mempool: %v", err)
284
289
}
290
+
}
291
+
if m.didIndex != nil { // ← ADD THIS
292
+
m.didIndex.Close()
285
293
}
286
294
}
287
295
···
407
415
}
408
416
409
417
m.mempool = newMempool
418
+
419
+
// Update DID index if enabled
420
+
if m.didIndex != nil && m.didIndex.Exists() {
421
+
if err := m.UpdateDIDIndexForBundle(ctx, bundle); err != nil {
422
+
m.logger.Printf("Warning: failed to update DID index: %v", err)
423
+
}
424
+
}
410
425
411
426
return nil
412
427
}
···
1282
1297
1283
1298
return cursor
1284
1299
}
1300
+
1301
+
// GetDIDIndex returns the DID index manager
1302
+
func (m *Manager) GetDIDIndex() *DIDIndexManager {
1303
+
return m.didIndex
1304
+
}
1305
+
1306
+
// LoadOperation loads a single operation from a bundle efficiently
1307
+
// This is much faster than LoadBundle() when you only need one operation
1308
+
func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plc.PLCOperation, error) {
1309
+
// Validate bundle exists in index
1310
+
meta, err := m.index.GetBundle(bundleNumber)
1311
+
if err != nil {
1312
+
return nil, fmt.Errorf("bundle not in index: %w", err)
1313
+
}
1314
+
1315
+
// Validate position
1316
+
if position < 0 || position >= BUNDLE_SIZE {
1317
+
return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, BUNDLE_SIZE-1)
1318
+
}
1319
+
1320
+
// Build file path
1321
+
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
1322
+
if !m.operations.FileExists(path) {
1323
+
return nil, fmt.Errorf("bundle file not found: %s", path)
1324
+
}
1325
+
1326
+
// Verify hash if enabled (same as LoadBundle)
1327
+
if m.config.VerifyOnLoad {
1328
+
valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
1329
+
if err != nil {
1330
+
return nil, fmt.Errorf("failed to verify hash: %w", err)
1331
+
}
1332
+
if !valid {
1333
+
return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
1334
+
}
1335
+
}
1336
+
1337
+
// Load just the one operation (efficient!)
1338
+
return m.operations.LoadOperationAtPosition(path, position)
1339
+
}
+50
bundle/operations.go
+50
bundle/operations.go
···
121
121
return contentHash, compressedHash, contentSize, compressedSize, nil
122
122
}
123
123
124
+
// LoadOperationAtPosition loads a single operation from a bundle without loading the entire bundle
125
+
// This is much more efficient for single-operation lookups
126
+
func (op *Operations) LoadOperationAtPosition(path string, position int) (*plc.PLCOperation, error) {
127
+
if position < 0 {
128
+
return nil, fmt.Errorf("invalid position: %d", position)
129
+
}
130
+
131
+
// Open compressed file
132
+
file, err := os.Open(path)
133
+
if err != nil {
134
+
return nil, fmt.Errorf("failed to open file: %w", err)
135
+
}
136
+
defer file.Close()
137
+
138
+
// Create zstd decompression reader (streaming, no full decompress)
139
+
reader := gozstd.NewReader(file)
140
+
defer reader.Close()
141
+
142
+
// Use scanner to skip to target line
143
+
scanner := bufio.NewScanner(reader)
144
+
buf := make([]byte, 0, 64*1024)
145
+
scanner.Buffer(buf, 1024*1024)
146
+
147
+
lineNum := 0
148
+
for scanner.Scan() {
149
+
if lineNum == position {
150
+
// Found target line!
151
+
line := scanner.Bytes()
152
+
153
+
var operation plc.PLCOperation
154
+
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
155
+
return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
156
+
}
157
+
158
+
// Store raw JSON
159
+
operation.RawJSON = make([]byte, len(line))
160
+
copy(operation.RawJSON, line)
161
+
162
+
return &operation, nil
163
+
}
164
+
lineNum++
165
+
}
166
+
167
+
if err := scanner.Err(); err != nil {
168
+
return nil, fmt.Errorf("scanner error: %w", err)
169
+
}
170
+
171
+
return nil, fmt.Errorf("position %d not found (bundle has %d operations)", position, lineNum)
172
+
}
173
+
124
174
// ========================================
125
175
// STREAMING
126
176
// ========================================
+421
cmd/plcbundle/did_index.go
+421
cmd/plcbundle/did_index.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"flag"
6
+
"fmt"
7
+
"os"
8
+
"time"
9
+
10
+
"github.com/goccy/go-json"
11
+
"tangled.org/atscan.net/plcbundle/bundle"
12
+
"tangled.org/atscan.net/plcbundle/plc"
13
+
)
14
+
15
+
func cmdDIDIndex() {
16
+
if len(os.Args) < 3 {
17
+
printDIDIndexUsage()
18
+
os.Exit(1)
19
+
}
20
+
21
+
subcommand := os.Args[2]
22
+
23
+
switch subcommand {
24
+
case "build":
25
+
cmdDIDIndexBuild()
26
+
case "stats":
27
+
cmdDIDIndexStats()
28
+
case "lookup":
29
+
cmdDIDIndexLookup()
30
+
case "resolve":
31
+
cmdDIDIndexResolve()
32
+
default:
33
+
fmt.Fprintf(os.Stderr, "Unknown index subcommand: %s\n", subcommand)
34
+
printDIDIndexUsage()
35
+
os.Exit(1)
36
+
}
37
+
}
38
+
39
+
func printDIDIndexUsage() {
40
+
fmt.Printf(`Usage: plcbundle index <command> [options]
41
+
42
+
Commands:
43
+
build Build DID index from bundles
44
+
stats Show index statistics
45
+
lookup Lookup a specific DID
46
+
resolve Resolve DID to current document
47
+
48
+
Examples:
49
+
plcbundle index build
50
+
plcbundle index stats
51
+
plcbundle index lookup -v did:plc:524tuhdhh3m7li5gycdn6boe
52
+
plcbundle index resolve did:plc:524tuhdhh3m7li5gycdn6boe
53
+
`)
54
+
}
55
+
56
+
func cmdDIDIndexBuild() {
57
+
fs := flag.NewFlagSet("index build", flag.ExitOnError)
58
+
force := fs.Bool("force", false, "rebuild even if index exists")
59
+
fs.Parse(os.Args[3:])
60
+
61
+
mgr, dir, err := getManager("")
62
+
if err != nil {
63
+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
64
+
os.Exit(1)
65
+
}
66
+
defer mgr.Close()
67
+
68
+
// Check if index exists
69
+
stats := mgr.GetDIDIndexStats()
70
+
if stats["exists"].(bool) && !*force {
71
+
fmt.Printf("DID index already exists (use --force to rebuild)\n")
72
+
fmt.Printf("Directory: %s\n", dir)
73
+
fmt.Printf("Total DIDs: %d\n", stats["total_dids"])
74
+
return
75
+
}
76
+
77
+
fmt.Printf("Building DID index in: %s\n", dir)
78
+
79
+
index := mgr.GetIndex()
80
+
bundleCount := index.Count()
81
+
82
+
if bundleCount == 0 {
83
+
fmt.Printf("No bundles to index\n")
84
+
return
85
+
}
86
+
87
+
fmt.Printf("Indexing %d bundles...\n\n", bundleCount)
88
+
89
+
progress := NewProgressBar(bundleCount)
90
+
91
+
start := time.Now()
92
+
ctx := context.Background()
93
+
94
+
err = mgr.BuildDIDIndex(ctx, func(current, total int) {
95
+
progress.Set(current)
96
+
})
97
+
98
+
progress.Finish()
99
+
100
+
if err != nil {
101
+
fmt.Fprintf(os.Stderr, "\nError building index: %v\n", err)
102
+
os.Exit(1)
103
+
}
104
+
105
+
elapsed := time.Since(start)
106
+
107
+
stats = mgr.GetDIDIndexStats()
108
+
109
+
fmt.Printf("\n✓ DID index built in %s\n", elapsed.Round(time.Millisecond))
110
+
fmt.Printf(" Total DIDs: %s\n", formatNumber(int(stats["total_dids"].(int64))))
111
+
fmt.Printf(" Shards: %d\n", stats["shard_count"])
112
+
fmt.Printf(" Location: %s/.plcbundle/\n", dir)
113
+
}
114
+
115
+
func cmdDIDIndexStats() {
116
+
mgr, dir, err := getManager("")
117
+
if err != nil {
118
+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
119
+
os.Exit(1)
120
+
}
121
+
defer mgr.Close()
122
+
123
+
stats := mgr.GetDIDIndexStats()
124
+
125
+
if !stats["exists"].(bool) {
126
+
fmt.Printf("DID index does not exist\n")
127
+
fmt.Printf("Run: plcbundle index build\n")
128
+
return
129
+
}
130
+
131
+
fmt.Printf("\nDID Index Statistics\n")
132
+
fmt.Printf("════════════════════\n\n")
133
+
fmt.Printf(" Location: %s/.plcbundle/\n", dir)
134
+
fmt.Printf(" Total DIDs: %s\n", formatNumber(int(stats["total_dids"].(int64))))
135
+
fmt.Printf(" Shard count: %d\n", stats["shard_count"])
136
+
fmt.Printf(" Last bundle: %06d\n", stats["last_bundle"])
137
+
fmt.Printf(" Updated: %s\n", stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05"))
138
+
fmt.Printf("\n")
139
+
fmt.Printf(" Cached shards: %d / %d\n", stats["cached_shards"], stats["cache_limit"])
140
+
141
+
if cachedList, ok := stats["cache_order"].([]int); ok && len(cachedList) > 0 {
142
+
fmt.Printf(" Hot shards: ")
143
+
for i, shard := range cachedList {
144
+
if i > 0 {
145
+
fmt.Printf(", ")
146
+
}
147
+
if i >= 10 {
148
+
fmt.Printf("... (+%d more)", len(cachedList)-10)
149
+
break
150
+
}
151
+
fmt.Printf("%02x", shard)
152
+
}
153
+
fmt.Printf("\n")
154
+
}
155
+
156
+
fmt.Printf("\n")
157
+
}
158
+
159
+
func cmdDIDIndexLookup() {
160
+
if len(os.Args) < 4 {
161
+
fmt.Fprintf(os.Stderr, "Usage: plcbundle index lookup <did> [-v]\n")
162
+
os.Exit(1)
163
+
}
164
+
165
+
fs := flag.NewFlagSet("index lookup", flag.ExitOnError)
166
+
verbose := fs.Bool("v", false, "verbose debug output")
167
+
fs.Parse(os.Args[3:])
168
+
169
+
if fs.NArg() < 1 {
170
+
fmt.Fprintf(os.Stderr, "Usage: plcbundle index lookup <did> [-v]\n")
171
+
os.Exit(1)
172
+
}
173
+
174
+
did := fs.Arg(0)
175
+
176
+
mgr, _, err := getManager("")
177
+
if err != nil {
178
+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
179
+
os.Exit(1)
180
+
}
181
+
defer mgr.Close()
182
+
183
+
stats := mgr.GetDIDIndexStats()
184
+
if !stats["exists"].(bool) {
185
+
fmt.Fprintf(os.Stderr, "⚠️ DID index does not exist. Run: plcbundle index build\n")
186
+
fmt.Fprintf(os.Stderr, " Falling back to full scan (this will be slow)...\n\n")
187
+
}
188
+
189
+
fmt.Printf("Looking up: %s\n", did)
190
+
if *verbose {
191
+
fmt.Printf("Verbose mode: enabled\n")
192
+
}
193
+
fmt.Printf("\n")
194
+
195
+
start := time.Now()
196
+
ctx := context.Background()
197
+
198
+
// Get bundled operations only
199
+
bundledOps, err := mgr.GetDIDOperationsBundledOnly(ctx, did, *verbose)
200
+
if err != nil {
201
+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
202
+
os.Exit(1)
203
+
}
204
+
205
+
// Get mempool operations separately
206
+
mempoolOps, err := mgr.GetDIDOperationsFromMempool(did)
207
+
if err != nil {
208
+
fmt.Fprintf(os.Stderr, "Error checking mempool: %v\n", err)
209
+
os.Exit(1)
210
+
}
211
+
212
+
elapsed := time.Since(start)
213
+
214
+
if len(bundledOps) == 0 && len(mempoolOps) == 0 {
215
+
fmt.Printf("DID not found (searched in %s)\n", elapsed)
216
+
return
217
+
}
218
+
219
+
// Count nullified operations
220
+
nullifiedCount := 0
221
+
for _, op := range bundledOps {
222
+
if op.IsNullified() {
223
+
nullifiedCount++
224
+
}
225
+
}
226
+
227
+
// Display summary
228
+
totalOps := len(bundledOps) + len(mempoolOps)
229
+
fmt.Printf("Found %d total operations in %s\n", totalOps, elapsed)
230
+
if len(bundledOps) > 0 {
231
+
fmt.Printf(" Bundled: %d (%d active, %d nullified)\n", len(bundledOps), len(bundledOps)-nullifiedCount, nullifiedCount)
232
+
}
233
+
if len(mempoolOps) > 0 {
234
+
fmt.Printf(" Mempool: %d (not yet bundled)\n", len(mempoolOps))
235
+
}
236
+
fmt.Printf("\n")
237
+
238
+
// Show bundled operations
239
+
if len(bundledOps) > 0 {
240
+
fmt.Printf("Bundled operations:\n")
241
+
for i, op := range bundledOps {
242
+
status := "✓"
243
+
if op.IsNullified() {
244
+
status = "✗"
245
+
}
246
+
247
+
fmt.Printf(" %s %d. CID: %s\n", status, i+1, op.CID)
248
+
fmt.Printf(" Time: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05"))
249
+
250
+
if op.IsNullified() {
251
+
if nullCID := op.GetNullifyingCID(); nullCID != "" {
252
+
fmt.Printf(" Nullified by: %s\n", nullCID)
253
+
} else {
254
+
fmt.Printf(" Nullified: true\n")
255
+
}
256
+
}
257
+
}
258
+
fmt.Printf("\n")
259
+
}
260
+
261
+
// Show mempool operations
262
+
if len(mempoolOps) > 0 {
263
+
fmt.Printf("Mempool operations (not yet bundled):\n")
264
+
for i, op := range mempoolOps {
265
+
status := "✓"
266
+
if op.IsNullified() {
267
+
status = "✗"
268
+
}
269
+
270
+
fmt.Printf(" %s %d. CID: %s\n", status, i+1, op.CID)
271
+
fmt.Printf(" Time: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05"))
272
+
}
273
+
}
274
+
}
275
+
276
+
func cmdDIDIndexResolve() {
277
+
if len(os.Args) < 4 {
278
+
fmt.Fprintf(os.Stderr, "Usage: plcbundle index resolve <did> [-v]\n")
279
+
os.Exit(1)
280
+
}
281
+
282
+
fs := flag.NewFlagSet("index resolve", flag.ExitOnError)
283
+
//verbose := fs.Bool("v", false, "verbose debug output")
284
+
fs.Parse(os.Args[3:])
285
+
286
+
if fs.NArg() < 1 {
287
+
fmt.Fprintf(os.Stderr, "Usage: plcbundle index resolve <did> [-v]\n")
288
+
os.Exit(1)
289
+
}
290
+
291
+
did := fs.Arg(0)
292
+
293
+
mgr, _, err := getManager("")
294
+
if err != nil {
295
+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
296
+
os.Exit(1)
297
+
}
298
+
defer mgr.Close()
299
+
300
+
ctx := context.Background()
301
+
fmt.Fprintf(os.Stderr, "Resolving: %s\n", did)
302
+
303
+
start := time.Now()
304
+
305
+
// ✨ STEP 0: Check mempool first (most recent data)
306
+
mempoolStart := time.Now()
307
+
var latestOp *plc.PLCOperation
308
+
foundInMempool := false
309
+
310
+
if mgr.GetMempool() != nil {
311
+
mempoolOps, err := mgr.GetMempoolOperations()
312
+
if err == nil && len(mempoolOps) > 0 {
313
+
// Search backward for this DID
314
+
for i := len(mempoolOps) - 1; i >= 0; i-- {
315
+
if mempoolOps[i].DID == did && !mempoolOps[i].IsNullified() {
316
+
latestOp = &mempoolOps[i]
317
+
foundInMempool = true
318
+
break
319
+
}
320
+
}
321
+
}
322
+
}
323
+
mempoolTime := time.Since(mempoolStart)
324
+
325
+
if foundInMempool {
326
+
fmt.Fprintf(os.Stderr, "Mempool check: %s (✓ found in mempool)\n", mempoolTime)
327
+
328
+
// Build document from mempool operation
329
+
ops := []plc.PLCOperation{*latestOp}
330
+
doc, err := plc.ResolveDIDDocument(did, ops)
331
+
if err != nil {
332
+
fmt.Fprintf(os.Stderr, "Build document failed: %v\n", err)
333
+
os.Exit(1)
334
+
}
335
+
336
+
totalTime := time.Since(start)
337
+
fmt.Fprintf(os.Stderr, "Total: %s (resolved from mempool)\n\n", totalTime)
338
+
339
+
// Output to stdout
340
+
data, _ := json.MarshalIndent(doc, "", " ")
341
+
fmt.Println(string(data))
342
+
return
343
+
}
344
+
345
+
fmt.Fprintf(os.Stderr, "Mempool check: %s (not found)\n", mempoolTime)
346
+
347
+
// Not in mempool - check index
348
+
stats := mgr.GetDIDIndexStats()
349
+
if !stats["exists"].(bool) {
350
+
fmt.Fprintf(os.Stderr, "⚠️ DID index does not exist. Run: plcbundle index build\n\n")
351
+
os.Exit(1)
352
+
}
353
+
354
+
// STEP 1: Index lookup timing
355
+
indexStart := time.Now()
356
+
locations, err := mgr.GetDIDIndex().GetDIDLocations(did)
357
+
if err != nil {
358
+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
359
+
os.Exit(1)
360
+
}
361
+
indexTime := time.Since(indexStart)
362
+
363
+
if len(locations) == 0 {
364
+
fmt.Fprintf(os.Stderr, "DID not found in index or mempool\n")
365
+
os.Exit(1)
366
+
}
367
+
368
+
// Find latest non-nullified location
369
+
var latestLoc *bundle.OpLocation
370
+
for i := range locations {
371
+
if locations[i].Nullified {
372
+
continue
373
+
}
374
+
if latestLoc == nil ||
375
+
locations[i].Bundle > latestLoc.Bundle ||
376
+
(locations[i].Bundle == latestLoc.Bundle && locations[i].Position > latestLoc.Position) {
377
+
latestLoc = &locations[i]
378
+
}
379
+
}
380
+
381
+
if latestLoc == nil {
382
+
fmt.Fprintf(os.Stderr, "No valid operations (all nullified)\n")
383
+
os.Exit(1)
384
+
}
385
+
386
+
fmt.Fprintf(os.Stderr, "Index lookup: %s (shard access)\n", indexTime)
387
+
388
+
// STEP 2: Bundle loading timing
389
+
bundleStart := time.Now()
390
+
bndl, err := mgr.LoadBundle(ctx, int(latestLoc.Bundle))
391
+
if err != nil {
392
+
fmt.Fprintf(os.Stderr, "Error loading bundle: %v\n", err)
393
+
os.Exit(1)
394
+
}
395
+
bundleTime := time.Since(bundleStart)
396
+
397
+
if int(latestLoc.Position) >= len(bndl.Operations) {
398
+
fmt.Fprintf(os.Stderr, "Invalid position\n")
399
+
os.Exit(1)
400
+
}
401
+
402
+
op := bndl.Operations[latestLoc.Position]
403
+
404
+
fmt.Fprintf(os.Stderr, "Bundle load: %s (bundle %d, pos %d)\n",
405
+
bundleTime, latestLoc.Bundle, latestLoc.Position)
406
+
407
+
// STEP 3: Build DID document
408
+
ops := []plc.PLCOperation{op}
409
+
doc, err := plc.ResolveDIDDocument(did, ops)
410
+
if err != nil {
411
+
fmt.Fprintf(os.Stderr, "Build document failed: %v\n", err)
412
+
os.Exit(1)
413
+
}
414
+
415
+
totalTime := time.Since(start)
416
+
fmt.Fprintf(os.Stderr, "Total: %s\n\n", totalTime)
417
+
418
+
// Output to stdout
419
+
data, _ := json.MarshalIndent(doc, "", " ")
420
+
fmt.Println(string(data))
421
+
}
+4
-1
cmd/plcbundle/main.go
+4
-1
cmd/plcbundle/main.go
···
89
89
cmdCompare()
90
90
case "detector":
91
91
cmdDetector()
92
+
case "index":
93
+
cmdDIDIndex()
92
94
case "version":
93
95
fmt.Printf("plcbundle version %s\n", version)
94
96
fmt.Printf(" commit: %s\n", gitCommit)
···
117
119
mempool Show mempool status and operations
118
120
serve Start HTTP server to serve bundle data
119
121
compare Compare local index with target index
120
-
detector
122
+
detector Run spam detectors
123
+
index Manage DID position index
121
124
version Show version
122
125
123
126
Security Model:
+160
-3
cmd/plcbundle/server.go
+160
-3
cmd/plcbundle/server.go
···
160
160
}
161
161
fmt.Fprintf(w, "\n")
162
162
163
+
if didStats := mgr.GetDIDIndexStats(); didStats["exists"].(bool) {
164
+
fmt.Fprintf(w, "\nDID Index\n")
165
+
fmt.Fprintf(w, "━━━━━━━━━\n")
166
+
fmt.Fprintf(w, " Status: enabled\n")
167
+
fmt.Fprintf(w, " Total DIDs: %d\n", didStats["total_dids"])
168
+
fmt.Fprintf(w, " Cached shards: %d / %d\n",
169
+
didStats["cached_shards"], didStats["cache_limit"])
170
+
fmt.Fprintf(w, "\n")
171
+
}
172
+
163
173
fmt.Fprintf(w, "Server Stats\n")
164
174
fmt.Fprintf(w, "━━━━━━━━━━━━\n")
165
175
fmt.Fprintf(w, " Version: %s\n", version)
···
180
190
fmt.Fprintf(w, " GET /jsonl/:number Decompressed JSONL stream\n")
181
191
fmt.Fprintf(w, " GET /status Server status\n")
182
192
fmt.Fprintf(w, " GET /mempool Mempool operations (JSONL)\n")
193
+
194
+
fmt.Fprintf(w, "\nDID Resolution\n")
195
+
fmt.Fprintf(w, "━━━━━━━━━━━━━━\n")
196
+
fmt.Fprintf(w, " GET /:did DID Document (W3C format)\n")
197
+
fmt.Fprintf(w, " GET /:did/data PLC State (raw format)\n")
198
+
fmt.Fprintf(w, " GET /:did/log/audit Operation history\n")
199
+
fmt.Fprintf(w, "\n")
183
200
184
201
if wsEnabled {
185
202
fmt.Fprintf(w, "\nWebSocket Endpoints\n")
···
284
301
285
302
// Root - ASCII art + info
286
303
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
287
-
if r.URL.Path != "/" {
288
-
http.NotFound(w, r)
304
+
path := r.URL.Path
305
+
306
+
// DID Resolution - delegate to specific handler
307
+
if strings.HasPrefix(path, "/did:plc:") {
308
+
handleDIDEndpoint(w, r, mgr)
309
+
return
310
+
}
311
+
312
+
if path == "/" {
313
+
handleRoot(w, r, mgr, syncMode, wsEnabled)
289
314
return
290
315
}
291
-
handleRoot(w, r, mgr, syncMode, wsEnabled)
316
+
317
+
http.NotFound(w, r)
292
318
})
293
319
294
320
// Index JSON (reload from disk each time for fresh data during rebuild)
···
1023
1049
time.Sleep(500 * time.Millisecond)
1024
1050
}
1025
1051
}
1052
+
1053
+
// handleDIDEndpoint routes DID-related requests
1054
+
func handleDIDEndpoint(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager) {
1055
+
path := r.URL.Path
1056
+
1057
+
// Parse DID and sub-path
1058
+
// Examples:
1059
+
// /did:plc:abc... -> DID document
1060
+
// /did:plc:abc.../data -> PLC state
1061
+
// /did:plc:abc.../log/audit -> Audit log
1062
+
1063
+
parts := strings.SplitN(strings.TrimPrefix(path, "/"), "/", 2)
1064
+
did := parts[0]
1065
+
1066
+
// Validate DID format
1067
+
if err := plc.ValidateDIDFormat(did); err != nil {
1068
+
http.Error(w, "Invalid DID format: "+err.Error(), http.StatusBadRequest)
1069
+
return
1070
+
}
1071
+
1072
+
// Route based on sub-path
1073
+
if len(parts) == 1 {
1074
+
// /did:plc:xxx -> DID document
1075
+
handleDIDDocument(w, r, mgr, did)
1076
+
} else if parts[1] == "data" {
1077
+
// /did:plc:xxx/data -> PLC state
1078
+
handleDIDData(w, r, mgr, did)
1079
+
} else if parts[1] == "log/audit" {
1080
+
// /did:plc:xxx/log/audit -> Audit log
1081
+
handleDIDAuditLog(w, r, mgr, did)
1082
+
} else {
1083
+
http.NotFound(w, r)
1084
+
}
1085
+
}
1086
+
1087
+
// handleDIDDocument resolves and returns DID document
1088
+
func handleDIDDocument(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, did string) {
1089
+
ctx := r.Context()
1090
+
1091
+
// Bundle package: just get the operations
1092
+
operations, err := mgr.GetDIDOperations(ctx, did, false)
1093
+
if err != nil {
1094
+
http.Error(w, err.Error(), http.StatusInternalServerError)
1095
+
return
1096
+
}
1097
+
1098
+
if len(operations) == 0 {
1099
+
http.NotFound(w, r)
1100
+
return
1101
+
}
1102
+
1103
+
// PLC package: resolve to DID document
1104
+
doc, err := plc.ResolveDIDDocument(did, operations)
1105
+
if err != nil {
1106
+
if strings.Contains(err.Error(), "deactivated") {
1107
+
http.Error(w, "DID has been deactivated", http.StatusGone)
1108
+
} else {
1109
+
http.Error(w, fmt.Sprintf("Resolution failed: %v", err), http.StatusInternalServerError)
1110
+
}
1111
+
return
1112
+
}
1113
+
1114
+
w.Header().Set("Content-Type", "application/did+ld+json")
1115
+
w.Header().Set("Access-Control-Allow-Origin", "*")
1116
+
1117
+
data, err := json.MarshalIndent(doc, "", " ")
1118
+
if err != nil {
1119
+
http.Error(w, "Failed to encode document", http.StatusInternalServerError)
1120
+
return
1121
+
}
1122
+
1123
+
w.Write(data)
1124
+
}
1125
+
1126
+
// handleDIDData returns PLC-specific state data
1127
+
func handleDIDData(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, did string) {
1128
+
ctx := r.Context()
1129
+
1130
+
// Bundle package: get operations
1131
+
operations, err := mgr.GetDIDOperations(ctx, did, false)
1132
+
if err != nil {
1133
+
http.Error(w, err.Error(), http.StatusInternalServerError)
1134
+
return
1135
+
}
1136
+
1137
+
if len(operations) == 0 {
1138
+
http.NotFound(w, r)
1139
+
return
1140
+
}
1141
+
1142
+
// PLC package: build state
1143
+
state, err := plc.BuildDIDState(did, operations)
1144
+
if err != nil {
1145
+
if strings.Contains(err.Error(), "deactivated") {
1146
+
http.Error(w, "DID has been deactivated", http.StatusGone)
1147
+
} else {
1148
+
http.Error(w, err.Error(), http.StatusInternalServerError)
1149
+
}
1150
+
return
1151
+
}
1152
+
1153
+
w.Header().Set("Content-Type", "application/json")
1154
+
w.Header().Set("Access-Control-Allow-Origin", "*")
1155
+
1156
+
json.NewEncoder(w).Encode(state)
1157
+
}
1158
+
1159
+
// handleDIDAuditLog returns the operation log for a DID
1160
+
func handleDIDAuditLog(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, did string) {
1161
+
ctx := r.Context()
1162
+
1163
+
// Bundle package: just retrieve operations
1164
+
operations, err := mgr.GetDIDOperations(ctx, did, false)
1165
+
if err != nil {
1166
+
http.Error(w, err.Error(), http.StatusInternalServerError)
1167
+
return
1168
+
}
1169
+
1170
+
if len(operations) == 0 {
1171
+
http.NotFound(w, r)
1172
+
return
1173
+
}
1174
+
1175
+
// PLC package: format audit log
1176
+
auditLog := plc.FormatAuditLog(operations)
1177
+
1178
+
w.Header().Set("Content-Type", "application/json")
1179
+
w.Header().Set("Access-Control-Allow-Origin", "*")
1180
+
1181
+
json.NewEncoder(w).Encode(auditLog)
1182
+
}
+311
plc/resolver.go
+311
plc/resolver.go
···
1
+
package plc
2
+
3
+
import (
4
+
"fmt"
5
+
"strings"
6
+
)
7
+
8
+
// DIDState represents the current state of a DID (PLC-specific format)
9
+
type DIDState struct {
10
+
DID string `json:"did"`
11
+
RotationKeys []string `json:"rotationKeys"`
12
+
VerificationMethods map[string]string `json:"verificationMethods"`
13
+
AlsoKnownAs []string `json:"alsoKnownAs"`
14
+
Services map[string]ServiceDefinition `json:"services"`
15
+
}
16
+
17
+
// ServiceDefinition represents a service endpoint
18
+
type ServiceDefinition struct {
19
+
Type string `json:"type"`
20
+
Endpoint string `json:"endpoint"`
21
+
}
22
+
23
+
// AuditLogEntry represents a single entry in the audit log
24
+
type AuditLogEntry struct {
25
+
DID string `json:"did"`
26
+
Operation interface{} `json:"operation"` // The parsed operation data
27
+
CID string `json:"cid"`
28
+
Nullified interface{} `json:"nullified,omitempty"`
29
+
CreatedAt string `json:"createdAt"`
30
+
}
31
+
32
+
// ResolveDIDDocument constructs a DID document from operation log
33
+
// This is the main entry point for DID resolution
34
+
func ResolveDIDDocument(did string, operations []PLCOperation) (*DIDDocument, error) {
35
+
if len(operations) == 0 {
36
+
return nil, fmt.Errorf("no operations found for DID")
37
+
}
38
+
39
+
// Build current state from operations
40
+
state, err := BuildDIDState(did, operations)
41
+
if err != nil {
42
+
return nil, err
43
+
}
44
+
45
+
// Convert to DID document format
46
+
return StateToDIDDocument(state), nil
47
+
}
48
+
49
+
// BuildDIDState applies operations in order to build current DID state
50
+
func BuildDIDState(did string, operations []PLCOperation) (*DIDState, error) {
51
+
var state *DIDState
52
+
53
+
for _, op := range operations {
54
+
// Skip nullified operations
55
+
if op.IsNullified() {
56
+
continue
57
+
}
58
+
59
+
// Parse operation data
60
+
opData, err := op.GetOperationData()
61
+
if err != nil {
62
+
return nil, fmt.Errorf("failed to parse operation: %w", err)
63
+
}
64
+
65
+
if opData == nil {
66
+
continue
67
+
}
68
+
69
+
// Check operation type
70
+
opType, _ := opData["type"].(string)
71
+
72
+
// Handle tombstone (deactivated DID)
73
+
if opType == "plc_tombstone" {
74
+
return nil, fmt.Errorf("DID has been deactivated")
75
+
}
76
+
77
+
// Initialize state on first operation
78
+
if state == nil {
79
+
state = &DIDState{DID: did}
80
+
}
81
+
82
+
// Apply operation to state
83
+
applyOperationToState(state, opData)
84
+
}
85
+
86
+
if state == nil {
87
+
return nil, fmt.Errorf("no valid operations found")
88
+
}
89
+
90
+
return state, nil
91
+
}
92
+
93
+
// applyOperationToState updates state with data from an operation
94
+
func applyOperationToState(state *DIDState, opData map[string]interface{}) {
95
+
// Update rotation keys
96
+
if rotKeys, ok := opData["rotationKeys"].([]interface{}); ok {
97
+
state.RotationKeys = make([]string, 0, len(rotKeys))
98
+
for _, k := range rotKeys {
99
+
if keyStr, ok := k.(string); ok {
100
+
state.RotationKeys = append(state.RotationKeys, keyStr)
101
+
}
102
+
}
103
+
}
104
+
105
+
// Update verification methods
106
+
if vm, ok := opData["verificationMethods"].(map[string]interface{}); ok {
107
+
state.VerificationMethods = make(map[string]string)
108
+
for key, val := range vm {
109
+
if valStr, ok := val.(string); ok {
110
+
state.VerificationMethods[key] = valStr
111
+
}
112
+
}
113
+
}
114
+
115
+
// Handle legacy signingKey format
116
+
if signingKey, ok := opData["signingKey"].(string); ok {
117
+
if state.VerificationMethods == nil {
118
+
state.VerificationMethods = make(map[string]string)
119
+
}
120
+
state.VerificationMethods["atproto"] = signingKey
121
+
}
122
+
123
+
// Update alsoKnownAs
124
+
if aka, ok := opData["alsoKnownAs"].([]interface{}); ok {
125
+
state.AlsoKnownAs = make([]string, 0, len(aka))
126
+
for _, a := range aka {
127
+
if akaStr, ok := a.(string); ok {
128
+
state.AlsoKnownAs = append(state.AlsoKnownAs, akaStr)
129
+
}
130
+
}
131
+
}
132
+
133
+
// Handle legacy handle format
134
+
if handle, ok := opData["handle"].(string); ok {
135
+
if len(state.AlsoKnownAs) == 0 {
136
+
state.AlsoKnownAs = []string{"at://" + handle}
137
+
}
138
+
}
139
+
140
+
// Update services
141
+
if services, ok := opData["services"].(map[string]interface{}); ok {
142
+
state.Services = make(map[string]ServiceDefinition)
143
+
for key, svc := range services {
144
+
if svcMap, ok := svc.(map[string]interface{}); ok {
145
+
svcType, _ := svcMap["type"].(string)
146
+
endpoint, _ := svcMap["endpoint"].(string)
147
+
state.Services[key] = ServiceDefinition{
148
+
Type: svcType,
149
+
Endpoint: normalizeServiceEndpoint(endpoint),
150
+
}
151
+
}
152
+
}
153
+
}
154
+
155
+
// Handle legacy service format
156
+
if service, ok := opData["service"].(string); ok {
157
+
if state.Services == nil {
158
+
state.Services = make(map[string]ServiceDefinition)
159
+
}
160
+
state.Services["atproto_pds"] = ServiceDefinition{
161
+
Type: "AtprotoPersonalDataServer",
162
+
Endpoint: normalizeServiceEndpoint(service),
163
+
}
164
+
}
165
+
}
166
+
167
+
// StateToDIDDocument converts internal PLC state to W3C DID document format
168
+
func StateToDIDDocument(state *DIDState) *DIDDocument {
169
+
// Detect key types to determine correct @context
170
+
contexts := []string{"https://www.w3.org/ns/did/v1"}
171
+
172
+
hasMultikey := false
173
+
hasSecp256k1 := false
174
+
hasP256 := false
175
+
176
+
// Check verification method key types
177
+
for _, didKey := range state.VerificationMethods {
178
+
keyType := detectKeyType(didKey)
179
+
switch keyType {
180
+
case "secp256k1":
181
+
hasSecp256k1 = true
182
+
case "p256":
183
+
hasP256 = true
184
+
default:
185
+
hasMultikey = true
186
+
}
187
+
}
188
+
189
+
// Add appropriate context URLs
190
+
if hasMultikey || hasSecp256k1 || hasP256 {
191
+
contexts = append(contexts, "https://w3id.org/security/multikey/v1")
192
+
}
193
+
if hasSecp256k1 {
194
+
contexts = append(contexts, "https://w3id.org/security/suites/secp256k1-2019/v1")
195
+
}
196
+
if hasP256 {
197
+
contexts = append(contexts, "https://w3id.org/security/suites/ecdsa-2019/v1")
198
+
}
199
+
200
+
doc := &DIDDocument{
201
+
Context: contexts,
202
+
ID: state.DID,
203
+
AlsoKnownAs: state.AlsoKnownAs,
204
+
}
205
+
206
+
// Convert services
207
+
for id, svc := range state.Services {
208
+
doc.Service = append(doc.Service, Service{
209
+
ID: "#" + id, // ← Just fragment (matching plc.directory)
210
+
Type: svc.Type,
211
+
ServiceEndpoint: svc.Endpoint,
212
+
})
213
+
}
214
+
215
+
// Keep verification methods with full DID (they're correct):
216
+
for id, didKey := range state.VerificationMethods {
217
+
doc.VerificationMethod = append(doc.VerificationMethod, VerificationMethod{
218
+
ID: state.DID + "#" + id, // ← Keep this as-is
219
+
Type: "Multikey",
220
+
Controller: state.DID,
221
+
PublicKeyMultibase: ExtractMultibaseFromDIDKey(didKey),
222
+
})
223
+
}
224
+
225
+
return doc
226
+
}
227
+
228
+
// detectKeyType detects the key type from did:key encoding
229
+
func detectKeyType(didKey string) string {
230
+
multibase := ExtractMultibaseFromDIDKey(didKey)
231
+
232
+
if len(multibase) < 3 {
233
+
return "unknown"
234
+
}
235
+
236
+
// The 'z' is the base58btc multibase prefix
237
+
// Actual key starts at position 1
238
+
switch {
239
+
case multibase[1] == 'Q' && multibase[2] == '3': // ← Fixed: was [0] and [1]
240
+
return "secp256k1" // Starts with zQ3s
241
+
case multibase[1] == 'D' && multibase[2] == 'n': // ← Fixed
242
+
return "p256" // Starts with zDn
243
+
case multibase[1] == '6' && multibase[2] == 'M': // ← Fixed
244
+
return "ed25519" // Starts with z6Mk
245
+
default:
246
+
return "unknown"
247
+
}
248
+
}
249
+
250
+
// ExtractMultibaseFromDIDKey extracts the multibase string from did:key: format
251
+
func ExtractMultibaseFromDIDKey(didKey string) string {
252
+
return strings.TrimPrefix(didKey, "did:key:")
253
+
}
254
+
255
+
// ValidateDIDFormat validates did:plc format
256
+
func ValidateDIDFormat(did string) error {
257
+
if !strings.HasPrefix(did, "did:plc:") {
258
+
return fmt.Errorf("invalid DID method: must start with 'did:plc:'")
259
+
}
260
+
261
+
if len(did) != 32 {
262
+
return fmt.Errorf("invalid DID length: expected 32 chars, got %d", len(did))
263
+
}
264
+
265
+
// Validate identifier part (24 chars, base32 alphabet)
266
+
identifier := strings.TrimPrefix(did, "did:plc:")
267
+
if len(identifier) != 24 {
268
+
return fmt.Errorf("invalid identifier length: expected 24 chars, got %d", len(identifier))
269
+
}
270
+
271
+
// Check base32 alphabet (a-z, 2-7, no 0189)
272
+
for _, c := range identifier {
273
+
if !((c >= 'a' && c <= 'z') || (c >= '2' && c <= '7')) {
274
+
return fmt.Errorf("invalid character in identifier: %c (must be base32: a-z, 2-7)", c)
275
+
}
276
+
}
277
+
278
+
return nil
279
+
}
280
+
281
+
// FormatAuditLog formats operations as an audit log
282
+
func FormatAuditLog(operations []PLCOperation) []AuditLogEntry {
283
+
log := make([]AuditLogEntry, 0, len(operations))
284
+
285
+
for _, op := range operations {
286
+
// Parse operation for the log
287
+
opData, _ := op.GetOperationData()
288
+
289
+
entry := AuditLogEntry{
290
+
DID: op.DID,
291
+
Operation: opData,
292
+
CID: op.CID,
293
+
Nullified: op.Nullified,
294
+
CreatedAt: op.CreatedAt.Format("2006-01-02T15:04:05.000Z"),
295
+
}
296
+
297
+
log = append(log, entry)
298
+
}
299
+
300
+
return log
301
+
}
302
+
303
+
func normalizeServiceEndpoint(endpoint string) string {
304
+
// If already has protocol, return as-is
305
+
if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") {
306
+
return endpoint
307
+
}
308
+
309
+
// Legacy format: add https:// prefix
310
+
return "https://" + endpoint
311
+
}
+104
scripts/test-did-resolution.sh
+104
scripts/test-did-resolution.sh
···
1
+
#!/bin/bash
2
+
# test-did-resolution.sh
3
+
# Compares local DID resolution with plc.directory
4
+
5
+
set -e
6
+
7
+
BUNDLE_RANGE="${1:-1}"
8
+
PLC_URL="https://plc.wtf"
9
+
TEMP_DIR=$(mktemp -d)
10
+
trap "rm -rf $TEMP_DIR" EXIT
11
+
12
+
# Colors
13
+
RED='\033[0;31m'
14
+
GREEN='\033[0;32m'
15
+
YELLOW='\033[1;33m'
16
+
NC='\033[0m' # No Color
17
+
18
+
echo "Testing DID resolution for bundle(s): $BUNDLE_RANGE"
19
+
echo "Temp directory: $TEMP_DIR"
20
+
echo ""
21
+
22
+
# Extract unique DIDs from bundle(s)
23
+
echo "Extracting DIDs from bundles..."
24
+
DIDS=$(plcbundle export --bundles "$BUNDLE_RANGE" | jq -r '.did' | sort -u)
25
+
DID_COUNT=$(echo "$DIDS" | wc -l | tr -d ' ')
26
+
27
+
echo "Found $DID_COUNT unique DIDs"
28
+
echo ""
29
+
30
+
# Test counters
31
+
TOTAL=0
32
+
PASSED=0
33
+
FAILED=0
34
+
ERRORS=0
35
+
36
+
# Test each DID
37
+
while IFS= read -r DID; do
38
+
TOTAL=$((TOTAL + 1))
39
+
40
+
# Progress
41
+
echo -ne "\r[$TOTAL/$DID_COUNT] Testing $DID..."
42
+
43
+
# Resolve locally
44
+
LOCAL_FILE="$TEMP_DIR/local.json"
45
+
if ! plcbundle index resolve "$DID" 2>/dev/null | jq --sort-keys . > "$LOCAL_FILE" 2>/dev/null; then
46
+
echo -e "\r[$TOTAL/$DID_COUNT] ${RED}ERROR${NC} $DID (local resolution failed)"
47
+
ERRORS=$((ERRORS + 1))
48
+
continue
49
+
fi
50
+
51
+
# Resolve remotely
52
+
REMOTE_FILE="$TEMP_DIR/remote.json"
53
+
if ! curl -s "$PLC_URL/$DID" | jq --sort-keys . > "$REMOTE_FILE" 2>/dev/null; then
54
+
echo -e "\r[$TOTAL/$DID_COUNT] ${YELLOW}SKIP${NC} $DID (remote fetch failed)"
55
+
ERRORS=$((ERRORS + 1))
56
+
continue
57
+
fi
58
+
59
+
# Compare
60
+
if diff -q "$LOCAL_FILE" "$REMOTE_FILE" > /dev/null 2>&1; then
61
+
echo -e "\r[$TOTAL/$DID_COUNT] ${GREEN}✓${NC} $DID"
62
+
PASSED=$((PASSED + 1))
63
+
else
64
+
echo -e "\r[$TOTAL/$DID_COUNT] ${RED}✗${NC} $DID"
65
+
echo ""
66
+
echo "Differences found:"
67
+
diff -u "$REMOTE_FILE" "$LOCAL_FILE" | head -20
68
+
echo ""
69
+
FAILED=$((FAILED + 1))
70
+
71
+
# Ask to continue
72
+
if [ $FAILED -ge 3 ]; then
73
+
echo ""
74
+
read -p "Continue testing? (y/N) " -n 1 -r
75
+
echo
76
+
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
77
+
break
78
+
fi
79
+
fi
80
+
fi
81
+
82
+
# Rate limit
83
+
sleep 0.1
84
+
85
+
done <<< "$DIDS"
86
+
87
+
# Summary
88
+
echo ""
89
+
echo "========================================="
90
+
echo "Test Results"
91
+
echo "========================================="
92
+
echo -e "Total: $TOTAL"
93
+
echo -e "${GREEN}Passed: $PASSED${NC}"
94
+
echo -e "${RED}Failed: $FAILED${NC}"
95
+
echo -e "${YELLOW}Errors: $ERRORS${NC}"
96
+
echo ""
97
+
98
+
if [ $FAILED -eq 0 ] && [ $ERRORS -eq 0 ]; then
99
+
echo -e "${GREEN}✓ All tests passed!${NC}"
100
+
exit 0
101
+
else
102
+
echo -e "${RED}✗ Some tests failed${NC}"
103
+
exit 1
104
+
fi