+11
bundle/manager.go
+11
bundle/manager.go
···
838
838
return m.operations.LoadOperationAtPosition(path, position)
839
839
}
840
840
841
+
// LoadOperations loads multiple operations from a bundle efficiently
842
+
func (m *Manager) LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error) {
843
+
// Build file path
844
+
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
845
+
if !m.operations.FileExists(path) {
846
+
return nil, fmt.Errorf("bundle file not found: %s", path)
847
+
}
848
+
849
+
return m.operations.LoadOperationsAtPositions(path, positions)
850
+
}
851
+
841
852
// filterBundleFiles filters out files starting with . or _
842
853
func filterBundleFiles(files []string) []string {
843
854
filtered := make([]string, 0, len(files))
+1
cmd/plcbundle/commands/common.go
+1
cmd/plcbundle/commands/common.go
···
40
40
GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error)
41
41
GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error)
42
42
LoadOperation(ctx context.Context, bundleNum, position int) (*plcclient.PLCOperation, error)
43
+
LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error)
43
44
CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error)
44
45
ResolveDID(ctx context.Context, did string) (*bundle.ResolveDIDResult, error)
45
46
RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig, verbose bool) (int, error)
+188
-46
cmd/plcbundle/commands/did.go
+188
-46
cmd/plcbundle/commands/did.go
···
12
12
"github.com/goccy/go-json"
13
13
"github.com/spf13/cobra"
14
14
"tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui"
15
+
"tangled.org/atscan.net/plcbundle/internal/didindex"
15
16
"tangled.org/atscan.net/plcbundle/internal/plcclient"
16
17
)
17
18
···
891
892
}
892
893
893
894
func batchResolve(mgr BundleManager, dids []string, output *os.File, workers int) error {
894
-
progress := ui.NewProgressBar(len(dids))
895
895
ctx := context.Background()
896
+
overallStart := time.Now()
897
+
898
+
// =================================================================
899
+
// PHASE 1: Batch lookup locations
900
+
// =================================================================
901
+
fmt.Fprintf(os.Stderr, "Phase 1/2: Looking up %d DID locations...\n", len(dids))
902
+
phase1Start := time.Now()
896
903
897
-
type resolveResult struct {
898
-
did string
899
-
doc *plcclient.DIDDocument
900
-
err error
904
+
type didLocation struct {
905
+
did string
906
+
bundle int
907
+
position int
908
+
fromMempool bool
909
+
mempoolOp *plcclient.PLCOperation
910
+
err error
901
911
}
902
912
903
-
// Now use it
904
-
jobs := make(chan string, len(dids))
905
-
results := make(chan resolveResult, len(dids))
913
+
locations := make([]didLocation, len(dids))
914
+
915
+
// Mempool check
916
+
for i, did := range dids {
917
+
if mempoolOp := findLatestInMempool(mgr, did); mempoolOp != nil {
918
+
locations[i] = didLocation{did: did, fromMempool: true, mempoolOp: mempoolOp}
919
+
}
920
+
}
921
+
922
+
// Batch index lookup
923
+
needsLookup := make([]string, 0)
924
+
lookupMap := make(map[string]int)
925
+
for i, did := range dids {
926
+
if !locations[i].fromMempool {
927
+
needsLookup = append(needsLookup, did)
928
+
lookupMap[did] = i
929
+
}
930
+
}
906
931
907
-
// Start worker pool
908
-
var wg sync.WaitGroup
909
-
for w := 0; w < workers; w++ {
910
-
wg.Add(1)
911
-
go func() {
912
-
defer wg.Done()
913
-
for did := range jobs {
914
-
result, err := mgr.ResolveDID(ctx, did)
915
-
if err != nil {
916
-
results <- resolveResult{did: did, err: err}
917
-
} else {
918
-
results <- resolveResult{did: did, doc: result.Document}
932
+
if len(needsLookup) > 0 {
933
+
batchResults, _ := mgr.GetDIDIndex().BatchGetDIDLocations(needsLookup)
934
+
for did, locs := range batchResults {
935
+
var latest *didindex.OpLocation
936
+
for i := range locs {
937
+
if !locs[i].Nullified() && (latest == nil || locs[i].IsAfter(*latest)) {
938
+
latest = &locs[i]
919
939
}
920
940
}
921
-
}()
941
+
942
+
idx := lookupMap[did]
943
+
if latest != nil {
944
+
locations[idx] = didLocation{did: did, bundle: latest.BundleInt(), position: latest.PositionInt()}
945
+
} else {
946
+
locations[idx] = didLocation{did: did, err: fmt.Errorf("not found")}
947
+
}
948
+
}
949
+
950
+
for _, did := range needsLookup {
951
+
if idx := lookupMap[did]; locations[idx].bundle == 0 && locations[idx].err == nil {
952
+
locations[idx] = didLocation{did: did, err: fmt.Errorf("not found")}
953
+
}
954
+
}
922
955
}
923
956
924
-
// Send jobs
925
-
for _, did := range dids {
926
-
jobs <- did
957
+
phase1Duration := time.Since(phase1Start)
958
+
fmt.Fprintf(os.Stderr, " ✓ %s\n\n", phase1Duration.Round(time.Millisecond))
959
+
960
+
// =================================================================
961
+
// PHASE 2: Group by bundle, load ops, resolve (MERGED, parallel)
962
+
// =================================================================
963
+
fmt.Fprintf(os.Stderr, "Phase 2/2: Loading and resolving (%d workers)...\n", workers)
964
+
965
+
// Group DIDs by bundle
966
+
type bundleGroup struct {
967
+
bundleNum int
968
+
dids []int // indices into locations array
969
+
}
970
+
971
+
bundleMap := make(map[int][]int)
972
+
mempoolDIDs := make([]int, 0)
973
+
errorDIDs := make([]int, 0)
974
+
975
+
for i, loc := range locations {
976
+
if loc.err != nil {
977
+
errorDIDs = append(errorDIDs, i)
978
+
} else if loc.fromMempool {
979
+
mempoolDIDs = append(mempoolDIDs, i)
980
+
} else {
981
+
bundleMap[loc.bundle] = append(bundleMap[loc.bundle], i)
982
+
}
927
983
}
928
-
close(jobs)
929
984
930
-
// Collect results in background
931
-
go func() {
932
-
wg.Wait()
933
-
close(results)
934
-
}()
985
+
bundles := make([]bundleGroup, 0, len(bundleMap))
986
+
for bn, didIndices := range bundleMap {
987
+
bundles = append(bundles, bundleGroup{bundleNum: bn, dids: didIndices})
988
+
}
935
989
936
-
// Process results
990
+
fmt.Fprintf(os.Stderr, " %d bundles, %d mempool, %d errors\n",
991
+
len(bundles), len(mempoolDIDs), len(errorDIDs))
992
+
993
+
// Setup output
937
994
writer := bufio.NewWriterSize(output, 512*1024)
938
995
defer writer.Flush()
939
996
940
-
resolved := 0
941
-
failed := 0
942
-
processed := 0
997
+
var (
998
+
resolved int
999
+
failed int
1000
+
processed int
1001
+
mu sync.Mutex // Single lock for all counters
1002
+
)
1003
+
1004
+
progress := ui.NewProgressBar(len(dids))
1005
+
1006
+
writeResult := func(doc *plcclient.DIDDocument, err error) {
1007
+
mu.Lock()
1008
+
defer mu.Unlock()
943
1009
944
-
for res := range results {
945
1010
processed++
1011
+
progress.Set(processed)
946
1012
947
-
if res.err != nil {
1013
+
if err != nil {
948
1014
failed++
949
-
if failed < 10 {
950
-
fmt.Fprintf(os.Stderr, "Failed to resolve %s: %v\n", res.did, res.err)
951
-
}
952
1015
} else {
953
1016
resolved++
954
-
data, _ := json.Marshal(res.doc)
1017
+
data, _ := json.Marshal(doc)
955
1018
writer.Write(data)
956
1019
writer.WriteByte('\n')
957
-
958
-
if processed%100 == 0 {
1020
+
if resolved%100 == 0 {
959
1021
writer.Flush()
960
1022
}
961
1023
}
1024
+
}
962
1025
963
-
progress.Set(processed)
1026
+
// Process mempool DIDs (already have ops)
1027
+
for _, idx := range mempoolDIDs {
1028
+
loc := locations[idx]
1029
+
doc, err := plcclient.ResolveDIDDocument(loc.did, []plcclient.PLCOperation{*loc.mempoolOp})
1030
+
writeResult(doc, err)
1031
+
}
1032
+
1033
+
// Process errors
1034
+
for range errorDIDs {
1035
+
writeResult(nil, fmt.Errorf("not found"))
964
1036
}
965
1037
1038
+
// ✨ Process bundles in parallel - LoadOperations once per bundle
1039
+
bundleJobs := make(chan bundleGroup, len(bundles))
1040
+
var wg sync.WaitGroup
1041
+
1042
+
for w := 0; w < workers; w++ {
1043
+
wg.Add(1)
1044
+
go func() {
1045
+
defer wg.Done()
1046
+
1047
+
for job := range bundleJobs {
1048
+
// Collect all positions needed from this bundle
1049
+
positions := make([]int, len(job.dids))
1050
+
for i, didIdx := range job.dids {
1051
+
positions[i] = locations[didIdx].position
1052
+
}
1053
+
1054
+
// ✨ Load operations once for this bundle
1055
+
ops, err := mgr.LoadOperations(ctx, job.bundleNum, positions)
1056
+
1057
+
if err != nil {
1058
+
// All DIDs from this bundle fail
1059
+
for range job.dids {
1060
+
writeResult(nil, err)
1061
+
}
1062
+
continue
1063
+
}
1064
+
1065
+
// Resolve each DID using loaded operations
1066
+
for i, didIdx := range job.dids {
1067
+
loc := locations[didIdx]
1068
+
1069
+
if op, ok := ops[positions[i]]; ok {
1070
+
doc, resolveErr := plcclient.ResolveDIDDocument(loc.did, []plcclient.PLCOperation{*op})
1071
+
writeResult(doc, resolveErr)
1072
+
} else {
1073
+
writeResult(nil, fmt.Errorf("operation not loaded"))
1074
+
}
1075
+
}
1076
+
}
1077
+
}()
1078
+
}
1079
+
1080
+
// Send bundle jobs
1081
+
for _, bg := range bundles {
1082
+
bundleJobs <- bg
1083
+
}
1084
+
close(bundleJobs)
1085
+
1086
+
wg.Wait()
966
1087
writer.Flush()
967
1088
progress.Finish()
1089
+
1090
+
totalDuration := time.Since(overallStart)
968
1091
969
1092
fmt.Fprintf(os.Stderr, "\n✓ Batch resolve complete\n")
970
-
fmt.Fprintf(os.Stderr, " DIDs input: %d\n", len(dids))
971
-
fmt.Fprintf(os.Stderr, " Resolved: %d\n", resolved)
1093
+
fmt.Fprintf(os.Stderr, " Resolved: %d/%d\n", resolved, len(dids))
972
1094
if failed > 0 {
973
-
fmt.Fprintf(os.Stderr, " Failed: %d\n", failed)
1095
+
fmt.Fprintf(os.Stderr, " Failed: %d\n", failed)
1096
+
}
1097
+
fmt.Fprintf(os.Stderr, " Total: %s (%.1f DIDs/sec)\n",
1098
+
totalDuration.Round(time.Millisecond),
1099
+
float64(resolved)/totalDuration.Seconds())
1100
+
1101
+
return nil
1102
+
}
1103
+
1104
+
// Helper function to find latest non-nullified op in mempool
1105
+
func findLatestInMempool(mgr BundleManager, did string) *plcclient.PLCOperation {
1106
+
ops, err := mgr.GetDIDOperationsFromMempool(did)
1107
+
if err != nil || len(ops) == 0 {
1108
+
return nil
1109
+
}
1110
+
1111
+
// Search backwards from most recent
1112
+
for i := len(ops) - 1; i >= 0; i-- {
1113
+
if !ops[i].IsNullified() {
1114
+
return &ops[i]
1115
+
}
974
1116
}
975
1117
976
1118
return nil
+84
internal/didindex/lookup.go
+84
internal/didindex/lookup.go
···
4
4
"context"
5
5
"fmt"
6
6
"sort"
7
+
"sync"
7
8
8
9
"tangled.org/atscan.net/plcbundle/internal/plcclient"
9
10
)
···
217
218
// Load ONLY the specific operation (efficient!)
218
219
return provider.LoadOperation(ctx, latestLoc.BundleInt(), latestLoc.PositionInt())
219
220
}
221
+
222
+
// BatchGetDIDLocations retrieves locations for multiple DIDs efficiently
223
+
// Returns map[did][]OpLocation - only locations, no operation loading
224
+
func (dim *Manager) BatchGetDIDLocations(dids []string) (map[string][]OpLocation, error) {
225
+
if !dim.Exists() {
226
+
return nil, fmt.Errorf("DID index not available")
227
+
}
228
+
229
+
// Group DIDs by shard to minimize shard loads
230
+
type shardQuery struct {
231
+
shardNum uint8
232
+
identifiers []string
233
+
didMap map[string]string // identifier -> original DID
234
+
}
235
+
236
+
shardQueries := make(map[uint8]*shardQuery)
237
+
238
+
for _, did := range dids {
239
+
identifier, err := extractDIDIdentifier(did)
240
+
if err != nil {
241
+
continue
242
+
}
243
+
244
+
shardNum := dim.calculateShard(identifier)
245
+
246
+
if shardQueries[shardNum] == nil {
247
+
shardQueries[shardNum] = &shardQuery{
248
+
shardNum: shardNum,
249
+
identifiers: make([]string, 0),
250
+
didMap: make(map[string]string),
251
+
}
252
+
}
253
+
254
+
sq := shardQueries[shardNum]
255
+
sq.identifiers = append(sq.identifiers, identifier)
256
+
sq.didMap[identifier] = did
257
+
}
258
+
259
+
if dim.verbose {
260
+
dim.logger.Printf("DEBUG: Batch lookup: %d DIDs across %d shards", len(dids), len(shardQueries))
261
+
}
262
+
263
+
// Process each shard (load once, search multiple times)
264
+
results := make(map[string][]OpLocation)
265
+
var mu sync.Mutex
266
+
267
+
var wg sync.WaitGroup
268
+
for _, sq := range shardQueries {
269
+
wg.Add(1)
270
+
go func(query *shardQuery) {
271
+
defer wg.Done()
272
+
273
+
// Load shard once
274
+
shard, err := dim.loadShard(query.shardNum)
275
+
if err != nil {
276
+
if dim.verbose {
277
+
dim.logger.Printf("DEBUG: Failed to load shard %02x: %v", query.shardNum, err)
278
+
}
279
+
return
280
+
}
281
+
defer dim.releaseShard(shard)
282
+
283
+
if shard.data == nil {
284
+
return
285
+
}
286
+
287
+
// Search for all identifiers in this shard
288
+
for _, identifier := range query.identifiers {
289
+
locations := dim.searchShard(shard, identifier)
290
+
if len(locations) > 0 {
291
+
originalDID := query.didMap[identifier]
292
+
mu.Lock()
293
+
results[originalDID] = locations
294
+
mu.Unlock()
295
+
}
296
+
}
297
+
}(sq)
298
+
}
299
+
300
+
wg.Wait()
301
+
302
+
return results, nil
303
+
}
+71
internal/storage/storage.go
+71
internal/storage/storage.go
···
360
360
361
361
return operations[startIdx:]
362
362
}
363
+
364
+
// LoadOperationsAtPositions loads multiple operations from a bundle in one pass
365
+
func (op *Operations) LoadOperationsAtPositions(path string, positions []int) (map[int]*plcclient.PLCOperation, error) {
366
+
if len(positions) == 0 {
367
+
return make(map[int]*plcclient.PLCOperation), nil
368
+
}
369
+
370
+
// Create position set for fast lookup
371
+
posSet := make(map[int]bool)
372
+
maxPos := 0
373
+
for _, pos := range positions {
374
+
if pos < 0 {
375
+
continue
376
+
}
377
+
posSet[pos] = true
378
+
if pos > maxPos {
379
+
maxPos = pos
380
+
}
381
+
}
382
+
383
+
file, err := os.Open(path)
384
+
if err != nil {
385
+
return nil, fmt.Errorf("failed to open file: %w", err)
386
+
}
387
+
defer file.Close()
388
+
389
+
reader := gozstd.NewReader(file)
390
+
defer reader.Close()
391
+
392
+
bufPtr := scannerBufPool.Get().(*[]byte)
393
+
defer scannerBufPool.Put(bufPtr)
394
+
395
+
scanner := bufio.NewScanner(reader)
396
+
scanner.Buffer(*bufPtr, 512*1024)
397
+
398
+
results := make(map[int]*plcclient.PLCOperation)
399
+
lineNum := 0
400
+
401
+
for scanner.Scan() {
402
+
// Early exit if we found everything
403
+
if len(results) == len(posSet) {
404
+
break
405
+
}
406
+
407
+
// Only parse if this position is requested
408
+
if posSet[lineNum] {
409
+
line := scanner.Bytes()
410
+
var operation plcclient.PLCOperation
411
+
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
412
+
return nil, fmt.Errorf("failed to parse operation at position %d: %w", lineNum, err)
413
+
}
414
+
415
+
operation.RawJSON = make([]byte, len(line))
416
+
copy(operation.RawJSON, line)
417
+
results[lineNum] = &operation
418
+
}
419
+
420
+
lineNum++
421
+
422
+
// Early exit if we passed the max position we need
423
+
if lineNum > maxPos {
424
+
break
425
+
}
426
+
}
427
+
428
+
if err := scanner.Err(); err != nil {
429
+
return nil, fmt.Errorf("scanner error: %w", err)
430
+
}
431
+
432
+
return results, nil
433
+
}