A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

multi-thread in batchresolve

+54 -12
+54 -12
cmd/plcbundle/commands/did.go
··· 6 6 "fmt" 7 7 "os" 8 8 "strings" 9 + "sync" 9 10 "time" 10 11 11 12 "github.com/goccy/go-json" ··· 889 890 return nil 890 891 } 891 892 892 - func batchResolve(mgr BundleManager, dids []string, output *os.File, _ int) error { 893 + func batchResolve(mgr BundleManager, dids []string, output *os.File, workers int) error { 893 894 progress := ui.NewProgressBar(len(dids)) 894 895 ctx := context.Background() 895 896 896 - resolved := 0 897 - failed := 0 897 + type resolveResult struct { 898 + did string 899 + doc *plcclient.DIDDocument 900 + err error 901 + } 902 + 903 + // Now use it 904 + jobs := make(chan string, len(dids)) 905 + results := make(chan resolveResult, len(dids)) 906 + 907 + // Start worker pool 908 + var wg sync.WaitGroup 909 + for w := 0; w < workers; w++ { 910 + wg.Add(1) 911 + go func() { 912 + defer wg.Done() 913 + for did := range jobs { 914 + result, err := mgr.ResolveDID(ctx, did) 915 + if err != nil { 916 + results <- resolveResult{did: did, err: err} 917 + } else { 918 + results <- resolveResult{did: did, doc: result.Document} 919 + } 920 + } 921 + }() 922 + } 923 + 924 + // Send jobs 925 + for _, did := range dids { 926 + jobs <- did 927 + } 928 + close(jobs) 898 929 899 - // Use buffered writer 930 + // Collect results in background 931 + go func() { 932 + wg.Wait() 933 + close(results) 934 + }() 935 + 936 + // Process results 900 937 writer := bufio.NewWriterSize(output, 512*1024) 901 938 defer writer.Flush() 902 939 903 - for i, did := range dids { 904 - result, err := mgr.ResolveDID(ctx, did) 905 - if err != nil { 940 + resolved := 0 941 + failed := 0 942 + processed := 0 943 + 944 + for res := range results { 945 + processed++ 946 + 947 + if res.err != nil { 906 948 failed++ 907 - if i < 10 { 908 - fmt.Fprintf(os.Stderr, "Failed to resolve %s: %v\n", did, err) 949 + if failed < 10 { 950 + fmt.Fprintf(os.Stderr, "Failed to resolve %s: %v\n", res.did, res.err) 909 951 } 910 952 } else { 911 953 resolved++ 912 - data, _ := json.Marshal(result.Document) 954 + data, _ := json.Marshal(res.doc) 913 955 writer.Write(data) 914 956 writer.WriteByte('\n') 915 957 916 - if i%100 == 0 { 958 + if processed%100 == 0 { 917 959 writer.Flush() 918 960 } 919 961 } 920 962 921 - progress.Set(i + 1) 963 + progress.Set(processed) 922 964 } 923 965 924 966 writer.Flush()