]> Cypherpunks repositories - gostls13.git/commitdiff
cmd/go: add a GODEBUG to limit the number of concurrent network connections
authorBryan C. Mills <bcmills@google.com>
Fri, 3 Mar 2023 14:48:19 +0000 (14:48 +0000)
committerGopher Robot <gobot@golang.org>
Fri, 19 May 2023 02:47:12 +0000 (02:47 +0000)
I implemented this in order to debug connection failures on a
new-to-me VM development environment that uses Cloud NAT. It doesn't
directly fix the bug, but perhaps folks will find it useful to
diagnose port-exhaustion-related flakiness in other environments.

For #52545.

Change-Id: Icd3f13dcf62e718560c4f4a965a4df7c1bd785ce
Reviewed-on: https://go-review.googlesource.com/c/go/+/473277
Run-TryBot: Bryan Mills <bcmills@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Michael Matloob <matloob@golang.org>
Auto-Submit: Bryan Mills <bcmills@google.com>

src/cmd/go/go_test.go
src/cmd/go/internal/base/limit.go [new file with mode: 0644]
src/cmd/go/internal/modfetch/codehost/git.go
src/cmd/go/internal/modfetch/codehost/svn.go
src/cmd/go/internal/modfetch/codehost/vcs.go
src/cmd/go/internal/vcs/vcs.go
src/cmd/go/internal/web/http.go
src/cmd/go/script_test.go
src/cmd/go/scriptconds_test.go

index 291f51244727d281341232230a5eec8fdf5b32ec..da7336da9eb27b063f5ac4779ddee3dc11ac5759 100644 (file)
@@ -19,6 +19,7 @@ import (
        "io"
        "io/fs"
        "log"
+       "math"
        "os"
        "os/exec"
        "path/filepath"
@@ -29,6 +30,7 @@ import (
        "testing"
        "time"
 
+       "cmd/go/internal/base"
        "cmd/go/internal/cache"
        "cmd/go/internal/cfg"
        "cmd/go/internal/robustio"
@@ -61,6 +63,12 @@ var (
        cgoEnabled           string // raw value from 'go env CGO_ENABLED'
 )
 
+// netTestSem is a semaphore limiting the number of tests that may use the
+// external network in parallel. If non-nil, it contains one buffer slot per
+// test (send to acquire), with a low enough limit that the overall number of
+// connections (summed across subprocesses) stays at or below base.NetLimit.
+var netTestSem chan struct{}
+
 var exeSuffix string = func() string {
        if runtime.GOOS == "windows" {
                return ".exe"
@@ -282,6 +290,17 @@ func TestMain(m *testing.M) {
                }
        }
 
+       if n, limited := base.NetLimit(); limited && n > 0 {
+               // Split the network limit into chunks, so that each parallel script can
+               // have one chunk. We want to run as many parallel scripts as possible, but
+               // also want to give each script as high a limit as possible.
+               // We arbitrarily split by sqrt(n) to try to balance those two goals.
+               netTestLimit := int(math.Sqrt(float64(n)))
+               netTestSem = make(chan struct{}, netTestLimit)
+               reducedLimit := fmt.Sprintf(",%s=%d", base.NetLimitGodebug.Name(), n/netTestLimit)
+               os.Setenv("GODEBUG", os.Getenv("GODEBUG")+reducedLimit)
+       }
+
        // Don't let these environment variables confuse the test.
        os.Setenv("GOENV", "off")
        os.Unsetenv("GOFLAGS")
@@ -369,6 +388,7 @@ type testgoData struct {
        tempdir        string
        ran            bool
        inParallel     bool
+       hasNet         bool
        stdout, stderr bytes.Buffer
        execDir        string // dir for tg.run
 }
@@ -411,6 +431,9 @@ func (tg *testgoData) parallel() {
        if tg.ran {
                tg.t.Fatal("internal testsuite error: call to parallel after run")
        }
+       if tg.hasNet {
+               tg.t.Fatal("internal testsuite error: call to parallel after acquireNet")
+       }
        for _, e := range tg.env {
                if strings.HasPrefix(e, "GOROOT=") || strings.HasPrefix(e, "GOPATH=") || strings.HasPrefix(e, "GOBIN=") {
                        val := e[strings.Index(e, "=")+1:]
@@ -423,6 +446,25 @@ func (tg *testgoData) parallel() {
        tg.t.Parallel()
 }
 
+// acquireNet skips t if the network is unavailable, and otherwise acquires a
+// netTestSem token for t to be released at the end of the test.
+//
+// t.Parallel must not be called after acquireNet.
+func (tg *testgoData) acquireNet() {
+       tg.t.Helper()
+       if tg.hasNet {
+               return
+       }
+
+       testenv.MustHaveExternalNetwork(tg.t)
+       if netTestSem != nil {
+               netTestSem <- struct{}{}
+               tg.t.Cleanup(func() { <-netTestSem })
+       }
+       tg.setenv("TESTGONETWORK", "")
+       tg.hasNet = true
+}
+
 // pwd returns the current directory.
 func (tg *testgoData) pwd() string {
        tg.t.Helper()
@@ -444,9 +486,6 @@ func (tg *testgoData) sleep() {
 // command.
 func (tg *testgoData) setenv(name, val string) {
        tg.t.Helper()
-       if tg.inParallel && (name == "GOROOT" || name == "GOPATH" || name == "GOBIN") && (strings.HasPrefix(val, "testdata") || strings.HasPrefix(val, "./testdata")) {
-               tg.t.Fatalf("internal testsuite error: call to setenv with testdata (%s=%s) after parallel", name, val)
-       }
        tg.unsetenv(name)
        tg.env = append(tg.env, name+"="+val)
 }
@@ -455,7 +494,10 @@ func (tg *testgoData) setenv(name, val string) {
 func (tg *testgoData) unsetenv(name string) {
        if tg.env == nil {
                tg.env = append([]string(nil), os.Environ()...)
-               tg.env = append(tg.env, "GO111MODULE=off")
+               tg.env = append(tg.env, "GO111MODULE=off", "TESTGONETWORK=panic")
+               if testing.Short() {
+                       tg.env = append(tg.env, "TESTGOVCS=panic")
+               }
        }
        for i, v := range tg.env {
                if strings.HasPrefix(v, name+"=") {
@@ -1012,12 +1054,13 @@ func TestNewReleaseRebuildsStalePackagesInGOPATH(t *testing.T) {
 
 // cmd/go: custom import path checking should not apply to Go packages without import comment.
 func TestIssue10952(t *testing.T) {
-       testenv.MustHaveExternalNetwork(t)
        testenv.MustHaveExecPath(t, "git")
 
        tg := testgo(t)
        defer tg.cleanup()
        tg.parallel()
+       tg.acquireNet()
+
        tg.tempDir("src")
        tg.setenv("GOPATH", tg.path("."))
        const importPath = "github.com/zombiezen/go-get-issue-10952"
@@ -1029,12 +1072,13 @@ func TestIssue10952(t *testing.T) {
 
 // Test git clone URL that uses SCP-like syntax and custom import path checking.
 func TestIssue11457(t *testing.T) {
-       testenv.MustHaveExternalNetwork(t)
        testenv.MustHaveExecPath(t, "git")
 
        tg := testgo(t)
        defer tg.cleanup()
        tg.parallel()
+       tg.acquireNet()
+
        tg.tempDir("src")
        tg.setenv("GOPATH", tg.path("."))
        const importPath = "rsc.io/go-get-issue-11457"
@@ -1054,12 +1098,13 @@ func TestIssue11457(t *testing.T) {
 }
 
 func TestGetGitDefaultBranch(t *testing.T) {
-       testenv.MustHaveExternalNetwork(t)
        testenv.MustHaveExecPath(t, "git")
 
        tg := testgo(t)
        defer tg.cleanup()
        tg.parallel()
+       tg.acquireNet()
+
        tg.tempDir("src")
        tg.setenv("GOPATH", tg.path("."))
 
@@ -1395,12 +1440,13 @@ func TestDefaultGOPATH(t *testing.T) {
 }
 
 func TestDefaultGOPATHGet(t *testing.T) {
-       testenv.MustHaveExternalNetwork(t)
        testenv.MustHaveExecPath(t, "git")
 
        tg := testgo(t)
        defer tg.cleanup()
        tg.parallel()
+       tg.acquireNet()
+
        tg.setenv("GOPATH", "")
        tg.tempDir("home")
        tg.setenv(homeEnvName(), tg.path("home"))
diff --git a/src/cmd/go/internal/base/limit.go b/src/cmd/go/internal/base/limit.go
new file mode 100644 (file)
index 0000000..b4160bd
--- /dev/null
@@ -0,0 +1,84 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package base
+
+import (
+       "fmt"
+       "internal/godebug"
+       "runtime"
+       "strconv"
+       "sync"
+)
+
+var NetLimitGodebug = godebug.New("#cmdgonetlimit")
+
+// NetLimit returns the limit on concurrent network operations
+// configured by GODEBUG=cmdgonetlimit, if any.
+//
+// A limit of 0 (indicated by 0, true) means that network operations should not
+// be allowed.
+func NetLimit() (int, bool) {
+       netLimitOnce.Do(func() {
+               s := NetLimitGodebug.Value()
+               if s == "" {
+                       return
+               }
+
+               n, err := strconv.Atoi(s)
+               if err != nil {
+                       Fatalf("invalid %s: %v", NetLimitGodebug.Name(), err)
+               }
+               if n < 0 {
+                       // Treat negative values as unlimited.
+                       return
+               }
+               netLimitSem = make(chan struct{}, n)
+       })
+
+       return cap(netLimitSem), netLimitSem != nil
+}
+
+// AcquireNet acquires a semaphore token for a network operation.
+func AcquireNet() (release func(), err error) {
+       hasToken := false
+       if n, ok := NetLimit(); ok {
+               if n == 0 {
+                       return nil, fmt.Errorf("network disabled by %v=%v", NetLimitGodebug.Name(), NetLimitGodebug.Value())
+               }
+               netLimitSem <- struct{}{}
+               hasToken = true
+       }
+
+       checker := new(netTokenChecker)
+       runtime.SetFinalizer(checker, (*netTokenChecker).panicUnreleased)
+
+       return func() {
+               if checker.released {
+                       panic("internal error: net token released twice")
+               }
+               checker.released = true
+               if hasToken {
+                       <-netLimitSem
+               }
+               runtime.SetFinalizer(checker, nil)
+       }, nil
+}
+
+var (
+       netLimitOnce sync.Once
+       netLimitSem  chan struct{}
+)
+
+type netTokenChecker struct {
+       released bool
+       // We want to use a finalizer to check that all acquired tokens are returned,
+       // so we arbitrarily pad the tokens with a string to defeat the runtime's
+       // “tiny allocator”.
+       unusedAvoidTinyAllocator string
+}
+
+func (c *netTokenChecker) panicUnreleased() {
+       panic("internal error: net token acquired but not released")
+}
index 60ec616c69678379555f9c805555aad28d5baf2f..d1a18a8d58925e8f32755e71a40590521065a976 100644 (file)
@@ -24,6 +24,7 @@ import (
        "sync"
        "time"
 
+       "cmd/go/internal/base"
        "cmd/go/internal/lockedfile"
        "cmd/go/internal/par"
        "cmd/go/internal/web"
@@ -241,7 +242,14 @@ func (r *gitRepo) loadRefs(ctx context.Context) (map[string]string, error) {
                // The git protocol sends all known refs and ls-remote filters them on the client side,
                // so we might as well record both heads and tags in one shot.
                // Most of the time we only care about tags but sometimes we care about heads too.
+               release, err := base.AcquireNet()
+               if err != nil {
+                       r.refsErr = err
+                       return
+               }
                out, gitErr := Run(ctx, r.dir, "git", "ls-remote", "-q", r.remote)
+               release()
+
                if gitErr != nil {
                        if rerr, ok := gitErr.(*RunError); ok {
                                if bytes.Contains(rerr.Stderr, []byte("fatal: could not read Username")) {
@@ -531,7 +539,14 @@ func (r *gitRepo) stat(ctx context.Context, rev string) (info *RevInfo, err erro
                        ref = hash
                        refspec = hash + ":refs/dummy"
                }
-               _, err := Run(ctx, r.dir, "git", "fetch", "-f", "--depth=1", r.remote, refspec)
+
+               release, err := base.AcquireNet()
+               if err != nil {
+                       return nil, err
+               }
+               _, err = Run(ctx, r.dir, "git", "fetch", "-f", "--depth=1", r.remote, refspec)
+               release()
+
                if err == nil {
                        return r.statLocal(ctx, rev, ref)
                }
@@ -566,6 +581,12 @@ func (r *gitRepo) fetchRefsLocked(ctx context.Context) error {
                // golang.org/issue/34266 and
                // https://github.com/git/git/blob/4c86140027f4a0d2caaa3ab4bd8bfc5ce3c11c8a/transport.c#L1303-L1309.)
 
+               release, err := base.AcquireNet()
+               if err != nil {
+                       return err
+               }
+               defer release()
+
                if _, err := Run(ctx, r.dir, "git", "fetch", "-f", r.remote, "refs/heads/*:refs/heads/*", "refs/tags/*:refs/tags/*"); err != nil {
                        return err
                }
index fe5b74f71b42cdc7dc0018983103fa7660c0b598..9c1c10097bb437486f394b5a0304d487732edf78 100644 (file)
@@ -15,6 +15,8 @@ import (
        "path/filepath"
        "strconv"
        "time"
+
+       "cmd/go/internal/base"
 )
 
 func svnParseStat(rev, out string) (*RevInfo, error) {
@@ -66,6 +68,10 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
                remotePath += "/" + subdir
        }
 
+       release, err := base.AcquireNet()
+       if err != nil {
+               return err
+       }
        out, err := Run(ctx, workDir, []string{
                "svn", "list",
                "--non-interactive",
@@ -75,6 +81,7 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
                "--revision", rev,
                "--", remotePath,
        })
+       release()
        if err != nil {
                return err
        }
@@ -98,6 +105,10 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
        }
        defer os.RemoveAll(exportDir) // best-effort
 
+       release, err = base.AcquireNet()
+       if err != nil {
+               return err
+       }
        _, err = Run(ctx, workDir, []string{
                "svn", "export",
                "--non-interactive",
@@ -112,6 +123,7 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
                "--", remotePath,
                exportDir,
        })
+       release()
        if err != nil {
                return err
        }
index 3c0c24a8914041867498b4e03a080a493cd79614..5bd100556b59c762c27766cb595c25be8fc3ebbd 100644 (file)
@@ -19,6 +19,7 @@ import (
        "sync"
        "time"
 
+       "cmd/go/internal/base"
        "cmd/go/internal/lockedfile"
        "cmd/go/internal/par"
        "cmd/go/internal/str"
@@ -109,7 +110,14 @@ func newVCSRepo(ctx context.Context, vcs, remote string) (Repo, error) {
        defer unlock()
 
        if _, err := os.Stat(filepath.Join(r.dir, "."+vcs)); err != nil {
-               if _, err := Run(ctx, r.dir, cmd.init(r.remote)); err != nil {
+               release, err := base.AcquireNet()
+               if err != nil {
+                       return nil, err
+               }
+               _, err = Run(ctx, r.dir, cmd.init(r.remote))
+               release()
+
+               if err != nil {
                        os.RemoveAll(r.dir)
                        return nil, err
                }
@@ -355,7 +363,13 @@ func (r *vcsRepo) Stat(ctx context.Context, rev string) (*RevInfo, error) {
 
 func (r *vcsRepo) fetch(ctx context.Context) {
        if len(r.cmd.fetch) > 0 {
+               release, err := base.AcquireNet()
+               if err != nil {
+                       r.fetchErr = err
+                       return
+               }
                _, r.fetchErr = Run(ctx, r.dir, r.cmd.fetch)
+               release()
        }
 }
 
index 2ef115da3160f25545b587dd8e41a8a72f5f9ec2..c65dd0f624a1b65476899348f40e3cacf35c3a6c 100644 (file)
@@ -22,6 +22,7 @@ import (
        "sync"
        "time"
 
+       "cmd/go/internal/base"
        "cmd/go/internal/cfg"
        "cmd/go/internal/search"
        "cmd/go/internal/str"
@@ -719,12 +720,24 @@ func (v *Cmd) Ping(scheme, repo string) error {
        }
        os.MkdirAll(dir, 0777) // Ignore errors — if unsuccessful, the command will likely fail.
 
+       release, err := base.AcquireNet()
+       if err != nil {
+               return err
+       }
+       defer release()
+
        return v.runVerboseOnly(dir, v.PingCmd, "scheme", scheme, "repo", repo)
 }
 
 // Create creates a new copy of repo in dir.
 // The parent of dir must exist; dir must not.
 func (v *Cmd) Create(dir, repo string) error {
+       release, err := base.AcquireNet()
+       if err != nil {
+               return err
+       }
+       defer release()
+
        for _, cmd := range v.CreateCmd {
                if err := v.run(filepath.Dir(dir), cmd, "dir", dir, "repo", repo); err != nil {
                        return err
@@ -735,6 +748,12 @@ func (v *Cmd) Create(dir, repo string) error {
 
 // Download downloads any new changes for the repo in dir.
 func (v *Cmd) Download(dir string) error {
+       release, err := base.AcquireNet()
+       if err != nil {
+               return err
+       }
+       defer release()
+
        for _, cmd := range v.DownloadCmd {
                if err := v.run(dir, cmd); err != nil {
                        return err
@@ -780,6 +799,12 @@ func (v *Cmd) TagSync(dir, tag string) error {
                }
        }
 
+       release, err := base.AcquireNet()
+       if err != nil {
+               return err
+       }
+       defer release()
+
        if tag == "" && v.TagSyncDefault != nil {
                for _, cmd := range v.TagSyncDefault {
                        if err := v.run(dir, cmd); err != nil {
index e7935ea18427708885aa4f95ed1f955fcd818d58..76b767c751e4e43ee78448ad8e0564d49e941714 100644 (file)
@@ -15,6 +15,7 @@ import (
        "crypto/tls"
        "errors"
        "fmt"
+       "io"
        "mime"
        "net"
        "net/http"
@@ -24,6 +25,7 @@ import (
        "time"
 
        "cmd/go/internal/auth"
+       "cmd/go/internal/base"
        "cmd/go/internal/cfg"
        "cmd/internal/browser"
 )
@@ -193,6 +195,11 @@ func get(security SecurityMode, url *urlpkg.URL) (*Response, error) {
                        req.URL.Host = t.ToHost
                }
 
+               release, err := base.AcquireNet()
+               if err != nil {
+                       return nil, nil, err
+               }
+
                var res *http.Response
                if security == Insecure && url.Scheme == "https" { // fail earlier
                        res, err = impatientInsecureHTTPClient.Do(req)
@@ -204,6 +211,17 @@ func get(security SecurityMode, url *urlpkg.URL) (*Response, error) {
                                res, err = securityPreservingDefaultClient.Do(req)
                        }
                }
+
+               if res == nil || res.Body == nil {
+                       release()
+               } else {
+                       body := res.Body
+                       res.Body = hookCloser{
+                               ReadCloser: body,
+                               afterClose: release,
+                       }
+               }
+
                return url, res, err
        }
 
@@ -358,3 +376,14 @@ func isLocalHost(u *urlpkg.URL) bool {
        }
        return false
 }
+
+type hookCloser struct {
+       io.ReadCloser
+       afterClose func()
+}
+
+func (c hookCloser) Close() error {
+       err := c.ReadCloser.Close()
+       c.afterClose()
+       return err
+}
index 072a2dfef547fa9409172feb72c331f288ea1854..e21e57002b311d95aaca7f0090d0fd396ba303c1 100644 (file)
@@ -116,7 +116,7 @@ func TestScript(t *testing.T) {
                                defer removeAll(workdir)
                        }
 
-                       s, err := script.NewState(ctx, workdir, env)
+                       s, err := script.NewState(tbContext(ctx, t), workdir, env)
                        if err != nil {
                                t.Fatal(err)
                        }
@@ -156,6 +156,23 @@ func TestScript(t *testing.T) {
        }
 }
 
+// testingTBKey is the Context key for a testing.TB.
+type testingTBKey struct{}
+
+// tbContext returns a Context derived from ctx and associated with t.
+func tbContext(ctx context.Context, t testing.TB) context.Context {
+       return context.WithValue(ctx, testingTBKey{}, t)
+}
+
+// tbFromContext returns the testing.TB associated with ctx, if any.
+func tbFromContext(ctx context.Context) (testing.TB, bool) {
+       t := ctx.Value(testingTBKey{})
+       if t == nil {
+               return nil, false
+       }
+       return t.(testing.TB), true
+}
+
 // initScriptState creates the initial directory structure in s for unpacking a
 // cmd/go script.
 func initScriptDirs(t testing.TB, s *script.State) {
@@ -216,6 +233,7 @@ func scriptEnv(srv *vcstest.Server, srvCertFile string) ([]string, error) {
                "TESTGO_VCSTEST_HOST=" + httpURL.Host,
                "TESTGO_VCSTEST_TLS_HOST=" + httpsURL.Host,
                "TESTGO_VCSTEST_CERT=" + srvCertFile,
+               "TESTGONETWORK=panic", // cleared by the [net] condition
                "GOSUMDB=" + testSumDBVerifierKey,
                "GONOPROXY=",
                "GONOSUMDB=",
@@ -239,6 +257,7 @@ func scriptEnv(srv *vcstest.Server, srvCertFile string) ([]string, error) {
                // Require all tests that use VCS commands to be skipped in short mode.
                env = append(env, "TESTGOVCS=panic")
        }
+
        if os.Getenv("CGO_ENABLED") != "" || runtime.GOOS != goHostOS || runtime.GOARCH != goHostArch {
                // If the actual CGO_ENABLED might not match the cmd/go default, set it
                // explicitly in the environment. Otherwise, leave it unset so that we also
index 3c893eeae561384e0ae4f5bfbfa7599d1206b4c5..641f69f312bf1f763794bd302b5450d1334bfe0d 100644 (file)
@@ -19,6 +19,7 @@ import (
        "runtime"
        "runtime/debug"
        "strings"
+       "sync"
 )
 
 func scriptConditions() map[string]script.Cond {
@@ -107,6 +108,8 @@ func hasBuildmode(s *script.State, mode string) (bool, error) {
        return platform.BuildModeSupported(runtime.Compiler, mode, GOOS, GOARCH), nil
 }
 
+var scriptNetEnabled sync.Map // testing.TB → already enabled
+
 func hasNet(s *script.State, host string) (bool, error) {
        if !testenv.HasExternalNetwork() {
                return false, nil
@@ -115,6 +118,26 @@ func hasNet(s *script.State, host string) (bool, error) {
        // TODO(bcmills): Add a flag or environment variable to allow skipping tests
        // for specific hosts and/or skipping all net tests except for specific hosts.
 
+       t, ok := tbFromContext(s.Context())
+       if !ok {
+               return false, errors.New("script Context unexpectedly missing testing.TB key")
+       }
+
+       if netTestSem != nil {
+               // When the number of external network connections is limited, we limit the
+               // number of net tests that can run concurrently so that the overall number
+               // of network connections won't exceed the limit.
+               _, dup := scriptNetEnabled.LoadOrStore(t, true)
+               if !dup {
+                       // Acquire a net token for this test until the test completes.
+                       netTestSem <- struct{}{}
+                       t.Cleanup(func() {
+                               <-netTestSem
+                               scriptNetEnabled.Delete(t)
+                       })
+               }
+       }
+
        // Since we have confirmed that the network is available,
        // allow cmd/go to use it.
        s.Setenv("TESTGONETWORK", "")