From f30044a03bc7cf107dbec03c02fb6d0072878252 Mon Sep 17 00:00:00 2001 From: Michael Matloob Date: Thu, 25 Jun 2020 19:11:28 -0400 Subject: [PATCH] cmd/go/internal: remove some users of par.Work par.Work is used in a number of places as a parallel work queue. This change replaces it with goroutines and channels in a number of simpler places where it's used. Change-Id: I0620eda46ec7b2c0599a8b9361639af7bb73a05a Reviewed-on: https://go-review.googlesource.com/c/go/+/248326 Run-TryBot: Michael Matloob TryBot-Result: Gobot Gobot Reviewed-by: Bryan C. Mills --- src/cmd/go/internal/modcmd/download.go | 69 +++++++++++++++----------- src/cmd/go/internal/modcmd/graph.go | 19 +++---- src/cmd/go/internal/modconv/convert.go | 59 ++++++++++++---------- src/cmd/go/internal/modget/get.go | 41 ++++++++++----- src/cmd/go/internal/modload/list.go | 37 +++++++++----- 5 files changed, 133 insertions(+), 92 deletions(-) diff --git a/src/cmd/go/internal/modcmd/download.go b/src/cmd/go/internal/modcmd/download.go index 946e8ed3cf..857362a72e 100644 --- a/src/cmd/go/internal/modcmd/download.go +++ b/src/cmd/go/internal/modcmd/download.go @@ -5,15 +5,15 @@ package modcmd import ( + "cmd/go/internal/modfetch" "context" "encoding/json" "os" + "runtime" "cmd/go/internal/base" "cmd/go/internal/cfg" - "cmd/go/internal/modfetch" "cmd/go/internal/modload" - "cmd/go/internal/par" "cmd/go/internal/work" "golang.org/x/mod/module" @@ -102,33 +102,7 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) { } } - var mods []*moduleJSON - var work par.Work - listU := false - listVersions := false - for _, info := range modload.ListModules(ctx, args, listU, listVersions) { - if info.Replace != nil { - info = info.Replace - } - if info.Version == "" && info.Error == nil { - // main module or module replaced with file path. - // Nothing to download. - continue - } - m := &moduleJSON{ - Path: info.Path, - Version: info.Version, - } - mods = append(mods, m) - if info.Error != nil { - m.Error = info.Error.Err - continue - } - work.Add(m) - } - - work.Do(10, func(item interface{}) { - m := item.(*moduleJSON) + downloadModule := func(m *moduleJSON) { var err error m.Info, err = modfetch.InfoFile(m.Path, m.Version) if err != nil { @@ -157,7 +131,42 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) { m.Error = err.Error() return } - }) + } + + var mods []*moduleJSON + listU := false + listVersions := false + type token struct{} + sem := make(chan token, runtime.GOMAXPROCS(0)) + for _, info := range modload.ListModules(ctx, args, listU, listVersions) { + if info.Replace != nil { + info = info.Replace + } + if info.Version == "" && info.Error == nil { + // main module or module replaced with file path. + // Nothing to download. + continue + } + m := &moduleJSON{ + Path: info.Path, + Version: info.Version, + } + mods = append(mods, m) + if info.Error != nil { + m.Error = info.Error.Err + continue + } + sem <- token{} + go func() { + downloadModule(m) + <-sem + }() + } + + // Fill semaphore channel to wait for goroutines to finish. + for n := cap(sem); n > 0; n-- { + sem <- token{} + } if *downloadJSON { for _, m := range mods { diff --git a/src/cmd/go/internal/modcmd/graph.go b/src/cmd/go/internal/modcmd/graph.go index 4853503fd4..6da12b9cab 100644 --- a/src/cmd/go/internal/modcmd/graph.go +++ b/src/cmd/go/internal/modcmd/graph.go @@ -15,7 +15,6 @@ import ( "cmd/go/internal/base" "cmd/go/internal/cfg" "cmd/go/internal/modload" - "cmd/go/internal/par" "cmd/go/internal/work" "golang.org/x/mod/module" @@ -59,23 +58,25 @@ func runGraph(ctx context.Context, cmd *base.Command, args []string) { return m.Path + "@" + m.Version } - // Note: using par.Work only to manage work queue. - // No parallelism here, so no locking. var out []string var deps int // index in out where deps start - var work par.Work - work.Add(modload.Target) - work.Do(1, func(item interface{}) { - m := item.(module.Version) + seen := map[module.Version]bool{modload.Target: true} + queue := []module.Version{modload.Target} + for len(queue) > 0 { + var m module.Version + m, queue = queue[0], queue[1:] list, _ := reqs.Required(m) for _, r := range list { - work.Add(r) + if !seen[r] { + queue = append(queue, r) + seen[r] = true + } out = append(out, format(m)+" "+format(r)+"\n") } if m == modload.Target { deps = len(out) } - }) + } sort.Slice(out[deps:], func(i, j int) bool { return out[deps+i][0] < out[deps+j][0] diff --git a/src/cmd/go/internal/modconv/convert.go b/src/cmd/go/internal/modconv/convert.go index f465a9f395..d5a0bc21e9 100644 --- a/src/cmd/go/internal/modconv/convert.go +++ b/src/cmd/go/internal/modconv/convert.go @@ -7,13 +7,12 @@ package modconv import ( "fmt" "os" + "runtime" "sort" "strings" - "sync" "cmd/go/internal/base" "cmd/go/internal/modfetch" - "cmd/go/internal/par" "golang.org/x/mod/modfile" "golang.org/x/mod/module" @@ -42,46 +41,52 @@ func ConvertLegacyConfig(f *modfile.File, file string, data []byte) error { // Convert requirements block, which may use raw SHA1 hashes as versions, // to valid semver requirement list, respecting major versions. - var ( - work par.Work - mu sync.Mutex - need = make(map[string]string) - replace = make(map[string]*modfile.Replace) - ) + versions := make([]*module.Version, len(mf.Require)) + replace := make(map[string]*modfile.Replace) for _, r := range mf.Replace { replace[r.New.Path] = r replace[r.Old.Path] = r } - for _, r := range mf.Require { + + type token struct{} + sem := make(chan token, runtime.GOMAXPROCS(0)) + for i, r := range mf.Require { m := r.Mod if m.Path == "" { continue } if re, ok := replace[m.Path]; ok { - work.Add(re.New) - continue + m = re.New } - work.Add(r.Mod) + sem <- token{} + go func(i int, m module.Version) { + repo, info, err := modfetch.ImportRepoRev(m.Path, m.Version) + if err != nil { + fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), m.Path, m.Version, err) + return + } + + path := repo.ModulePath() + versions[i].Path = path + versions[i].Version = info.Version + + <-sem + }(i, m) + } + // Fill semaphore channel to wait for all tasks to finish. + for n := cap(sem); n > 0; n-- { + sem <- token{} } - work.Do(10, func(item interface{}) { - r := item.(module.Version) - repo, info, err := modfetch.ImportRepoRev(r.Path, r.Version) - if err != nil { - fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), r.Path, r.Version, err) - return - } - mu.Lock() - path := repo.ModulePath() + need := map[string]string{} + for _, v := range versions { // Don't use semver.Max here; need to preserve +incompatible suffix. - if v, ok := need[path]; !ok || semver.Compare(v, info.Version) < 0 { - need[path] = info.Version + if needv, ok := need[v.Path]; !ok || semver.Compare(needv, v.Version) < 0 { + need[v.Path] = v.Version } - mu.Unlock() - }) - - var paths []string + } + paths := make([]string, 0, len(need)) for path := range need { paths = append(paths, path) } diff --git a/src/cmd/go/internal/modget/get.go b/src/cmd/go/internal/modget/get.go index 93a6bb54d5..d02c9a8da5 100644 --- a/src/cmd/go/internal/modget/get.go +++ b/src/cmd/go/internal/modget/get.go @@ -11,6 +11,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "sort" "strings" "sync" @@ -21,7 +22,6 @@ import ( "cmd/go/internal/load" "cmd/go/internal/modload" "cmd/go/internal/mvs" - "cmd/go/internal/par" "cmd/go/internal/search" "cmd/go/internal/work" @@ -725,18 +725,8 @@ func runGet(ctx context.Context, cmd *base.Command, args []string) { // reported. A map from module paths to queries is returned, which includes // queries and modOnly. func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*query, modOnly map[string]*query) map[string]*query { - var lookup par.Work - for _, q := range queries { - if cached := cache[q.querySpec]; cached != nil { - *q = *cached - } else { - cache[q.querySpec] = q - lookup.Add(q) - } - } - lookup.Do(10, func(item interface{}) { - q := item.(*query) + runQuery := func(q *query) { if q.vers == "none" { // Wait for downgrade step. q.m = module.Version{Path: q.path, Version: "none"} @@ -747,7 +737,32 @@ func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*quer base.Errorf("go get %s: %v", q.arg, err) } q.m = m - }) + } + + type token struct{} + sem := make(chan token, runtime.GOMAXPROCS(0)) + for _, q := range queries { + if cached := cache[q.querySpec]; cached != nil { + *q = *cached + } else { + sem <- token{} + go func(q *query) { + runQuery(q) + <-sem + }(q) + } + } + + // Fill semaphore channel to wait for goroutines to finish. + for n := cap(sem); n > 0; n-- { + sem <- token{} + } + + // Add to cache after concurrent section to avoid races... + for _, q := range queries { + cache[q.querySpec] = q + } + base.ExitIfErrors() byPath := make(map[string]*query) diff --git a/src/cmd/go/internal/modload/list.go b/src/cmd/go/internal/modload/list.go index 4768516e90..8db4d64706 100644 --- a/src/cmd/go/internal/modload/list.go +++ b/src/cmd/go/internal/modload/list.go @@ -9,12 +9,12 @@ import ( "errors" "fmt" "os" + "runtime" "strings" "cmd/go/internal/base" "cmd/go/internal/cfg" "cmd/go/internal/modinfo" - "cmd/go/internal/par" "cmd/go/internal/search" "golang.org/x/mod/module" @@ -22,24 +22,35 @@ import ( func ListModules(ctx context.Context, args []string, listU, listVersions bool) []*modinfo.ModulePublic { mods := listModules(ctx, args, listVersions) + + type token struct{} + sem := make(chan token, runtime.GOMAXPROCS(0)) if listU || listVersions { - var work par.Work for _, m := range mods { - work.Add(m) + add := func(m *modinfo.ModulePublic) { + sem <- token{} + go func() { + if listU { + addUpdate(m) + } + if listVersions { + addVersions(m) + } + <-sem + }() + } + + add(m) if m.Replace != nil { - work.Add(m.Replace) + add(m.Replace) } } - work.Do(10, func(item interface{}) { - m := item.(*modinfo.ModulePublic) - if listU { - addUpdate(m) - } - if listVersions { - addVersions(m) - } - }) } + // Fill semaphore channel to wait for all tasks to finish. + for n := cap(sem); n > 0; n-- { + sem <- token{} + } + return mods } -- 2.50.0