]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: fix Transport data race, double cancel panic, cancel error message
authorBrad Fitzpatrick <bradfitz@golang.org>
Tue, 7 Apr 2015 13:50:34 +0000 (15:50 +0200)
committerBrad Fitzpatrick <bradfitz@golang.org>
Mon, 20 Apr 2015 20:34:43 +0000 (20:34 +0000)
Fixes #9496
Fixes #9946
Fixes #10474
Fixes #10405

Change-Id: I4e65f1706e46499811d9ebf4ad6d83a5dfb2ddaa
Reviewed-on: https://go-review.googlesource.com/8550
Reviewed-by: Daniel Morsing <daniel.morsing@gmail.com>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>

src/net/http/client_test.go
src/net/http/export_test.go
src/net/http/main_test.go
src/net/http/npn_test.go
src/net/http/request_test.go
src/net/http/serve_test.go
src/net/http/transport.go
src/net/http/transport_test.go

index 18645ff00d65e1ec98879f11825f323958e644e1..dc499a90b6ac59cf9cd275329de5d5e393cc95fa 100644 (file)
@@ -334,6 +334,7 @@ var echoCookiesRedirectHandler = HandlerFunc(func(w ResponseWriter, r *Request)
 })
 
 func TestClientSendsCookieFromJar(t *testing.T) {
+       defer afterTest(t)
        tr := &recordingTransport{}
        client := &Client{Transport: tr}
        client.Jar = &TestJar{perURL: make(map[string][]*Cookie)}
index e0bbc8067047597d0d0c8ee9e4555cca56b89468..69757bdca6518a6f649fd03753d975df3d700ae8 100644 (file)
@@ -110,3 +110,5 @@ func SetPendingDialHooks(before, after func()) {
 var ExportServerNewConn = (*Server).newConn
 
 var ExportCloseWriteAndWait = (*conn).closeWriteAndWait
+
+var ExportErrRequestCanceled = errRequestCanceled
index c7407df707cea6c7457ee1b9719f5b4030e04a65..12eea6f0e119c1eaf5d9b955493ffb6d271fcf84 100644 (file)
@@ -56,17 +56,21 @@ func goroutineLeaked() bool {
                // not counting goroutines for leakage in -short mode
                return false
        }
-       gs := interestingGoroutines()
 
-       n := 0
-       stackCount := make(map[string]int)
-       for _, g := range gs {
-               stackCount[g]++
-               n++
-       }
-
-       if n == 0 {
-               return false
+       var stackCount map[string]int
+       for i := 0; i < 5; i++ {
+               n := 0
+               stackCount = make(map[string]int)
+               gs := interestingGoroutines()
+               for _, g := range gs {
+                       stackCount[g]++
+                       n++
+               }
+               if n == 0 {
+                       return false
+               }
+               // Wait for goroutines to schedule and die off:
+               time.Sleep(100 * time.Millisecond)
        }
        fmt.Fprintf(os.Stderr, "Too many goroutines running after net/http test(s).\n")
        for stack, count := range stackCount {
index 98b8930d0648b654a614d9ba042f6724c542bea4..e2e911d3dd13a778d92bc87ca57817243076bd48 100644 (file)
@@ -6,6 +6,7 @@ package http_test
 
 import (
        "bufio"
+       "bytes"
        "crypto/tls"
        "fmt"
        "io"
@@ -17,6 +18,7 @@ import (
 )
 
 func TestNextProtoUpgrade(t *testing.T) {
+       defer afterTest(t)
        ts := httptest.NewUnstartedServer(HandlerFunc(func(w ResponseWriter, r *Request) {
                fmt.Fprintf(w, "path=%s,proto=", r.URL.Path)
                if r.TLS != nil {
@@ -38,12 +40,12 @@ func TestNextProtoUpgrade(t *testing.T) {
        ts.StartTLS()
        defer ts.Close()
 
-       tr := newTLSTransport(t, ts)
-       defer tr.CloseIdleConnections()
-       c := &Client{Transport: tr}
-
        // Normal request, without NPN.
        {
+               tr := newTLSTransport(t, ts)
+               defer tr.CloseIdleConnections()
+               c := &Client{Transport: tr}
+
                res, err := c.Get(ts.URL)
                if err != nil {
                        t.Fatal(err)
@@ -60,11 +62,17 @@ func TestNextProtoUpgrade(t *testing.T) {
        // Request to an advertised but unhandled NPN protocol.
        // Server will hang up.
        {
-               tr.CloseIdleConnections()
+               tr := newTLSTransport(t, ts)
                tr.TLSClientConfig.NextProtos = []string{"unhandled-proto"}
-               _, err := c.Get(ts.URL)
+               defer tr.CloseIdleConnections()
+               c := &Client{Transport: tr}
+
+               res, err := c.Get(ts.URL)
                if err == nil {
-                       t.Errorf("expected error on unhandled-proto request")
+                       defer res.Body.Close()
+                       var buf bytes.Buffer
+                       res.Write(&buf)
+                       t.Errorf("expected error on unhandled-proto request; got: %s", buf.Bytes())
                }
        }
 
index 9228d50ef74e7e39f0f87aa5d30f6acc2da2ecad..a518b004496b46f49af83e222ca4ca41af2abf50 100644 (file)
@@ -178,6 +178,7 @@ func TestParseMultipartForm(t *testing.T) {
 }
 
 func TestRedirect(t *testing.T) {
+       defer afterTest(t)
        ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
                switch r.URL.Path {
                case "/":
index c21b57b57e4d7a00e47de181d293c7bfcc28c459..6e1b3ed0251f07f90cb308a4b8cba3a68d5c7da4 100644 (file)
@@ -146,6 +146,7 @@ func (ht handlerTest) rawResponse(req string) string {
 }
 
 func TestConsumingBodyOnNextConn(t *testing.T) {
+       defer afterTest(t)
        conn := new(testConn)
        for i := 0; i < 2; i++ {
                conn.readBuf.Write([]byte(
index 2528b8e1cd43bd00f702f7e66e484def8382195a..79a418765b790f1972c9c1448b1dad776816f878 100644 (file)
@@ -279,6 +279,7 @@ func (t *Transport) CloseIdleConnections() {
 func (t *Transport) CancelRequest(req *Request) {
        t.reqMu.Lock()
        cancel := t.reqCanceler[req]
+       delete(t.reqCanceler, req)
        t.reqMu.Unlock()
        if cancel != nil {
                cancel()
@@ -805,6 +806,7 @@ type persistConn struct {
        numExpectedResponses int
        closed               bool // whether conn has been closed
        broken               bool // an error has happened on this connection; marked broken so it's not reused.
+       canceled             bool // whether this conn was broken due a CancelRequest
        // mutateHeaderFunc is an optional func to modify extra
        // headers on each outbound request before it's written. (the
        // original Request given to RoundTrip is not modified)
@@ -819,8 +821,18 @@ func (pc *persistConn) isBroken() bool {
        return b
 }
 
+// isCanceled reports whether this connection was closed due to CancelRequest.
+func (pc *persistConn) isCanceled() bool {
+       pc.lk.Lock()
+       defer pc.lk.Unlock()
+       return pc.canceled
+}
+
 func (pc *persistConn) cancelRequest() {
-       pc.conn.Close()
+       pc.lk.Lock()
+       defer pc.lk.Unlock()
+       pc.canceled = true
+       pc.closeLocked()
 }
 
 var remoteSideClosedFunc func(error) bool // or nil to use default
@@ -836,8 +848,13 @@ func remoteSideClosed(err error) bool {
 }
 
 func (pc *persistConn) readLoop() {
-       alive := true
+       // eofc is used to block http.Handler goroutines reading from Response.Body
+       // at EOF until this goroutines has (potentially) added the connection
+       // back to the idle pool.
+       eofc := make(chan struct{})
+       defer close(eofc) // unblock reader on errors
 
+       alive := true
        for alive {
                pb, err := pc.br.Peek(1)
 
@@ -895,22 +912,22 @@ func (pc *persistConn) readLoop() {
                        alive = false
                }
 
-               var waitForBodyRead chan bool
+               var waitForBodyRead chan bool // channel is nil when there's no body
                if hasBody {
                        waitForBodyRead = make(chan bool, 2)
                        resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
-                               // Sending false here sets alive to
-                               // false and closes the connection
-                               // below.
                                waitForBodyRead <- false
                                return nil
                        }
-                       resp.Body.(*bodyEOFSignal).fn = func(err error) {
-                               waitForBodyRead <- alive &&
-                                       err == nil &&
-                                       !pc.sawEOF &&
-                                       pc.wroteRequest() &&
-                                       pc.t.putIdleConn(pc)
+                       resp.Body.(*bodyEOFSignal).fn = func(err error) error {
+                               isEOF := err == io.EOF
+                               waitForBodyRead <- isEOF
+                               if isEOF {
+                                       <-eofc // see comment at top
+                               } else if err != nil && pc.isCanceled() {
+                                       return errRequestCanceled
+                               }
+                               return err
                        }
                }
 
@@ -924,28 +941,33 @@ func (pc *persistConn) readLoop() {
                // on the response channel before erroring out.
                rc.ch <- responseAndError{resp, err}
 
-               if alive && !hasBody {
-                       alive = !pc.sawEOF &&
-                               pc.wroteRequest() &&
-                               pc.t.putIdleConn(pc)
-               }
-
-               // Wait for the just-returned response body to be fully consumed
-               // before we race and peek on the underlying bufio reader.
-               if waitForBodyRead != nil {
+               if hasBody {
+                       // To avoid a race, wait for the just-returned
+                       // response body to be fully consumed before peek on
+                       // the underlying bufio reader.
                        select {
-                       case alive = <-waitForBodyRead:
+                       case bodyEOF := <-waitForBodyRead:
+                               pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
+                               alive = alive &&
+                                       bodyEOF &&
+                                       !pc.sawEOF &&
+                                       pc.wroteRequest() &&
+                                       pc.t.putIdleConn(pc)
+                               if bodyEOF {
+                                       eofc <- struct{}{}
+                               }
                        case <-pc.closech:
                                alive = false
                        }
-               }
-
-               pc.t.setReqCanceler(rc.req, nil)
-
-               if !alive {
-                       pc.close()
+               } else {
+                       pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
+                       alive = alive &&
+                               !pc.sawEOF &&
+                               pc.wroteRequest() &&
+                               pc.t.putIdleConn(pc)
                }
        }
+       pc.close()
 }
 
 func (pc *persistConn) writeLoop() {
@@ -1035,6 +1057,7 @@ func (e *httpError) Temporary() bool { return true }
 
 var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
 var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
+var errRequestCanceled = errors.New("net/http: request canceled")
 
 var testHookPersistConnClosedGotRes func() // nil except for tests
 
@@ -1183,16 +1206,18 @@ func canonicalAddr(url *url.URL) string {
 
 // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
 // once, right before its final (error-producing) Read or Close call
-// returns. If earlyCloseFn is non-nil and Close is called before
-// io.EOF is seen, earlyCloseFn is called instead of fn, and its
-// return value is the return value from Close.
+// returns. fn should return the new error to return from Read or Close.
+//
+// If earlyCloseFn is non-nil and Close is called before io.EOF is
+// seen, earlyCloseFn is called instead of fn, and its return value is
+// the return value from Close.
 type bodyEOFSignal struct {
        body         io.ReadCloser
-       mu           sync.Mutex   // guards following 4 fields
-       closed       bool         // whether Close has been called
-       rerr         error        // sticky Read error
-       fn           func(error)  // error will be nil on Read io.EOF
-       earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
+       mu           sync.Mutex        // guards following 4 fields
+       closed       bool              // whether Close has been called
+       rerr         error             // sticky Read error
+       fn           func(error) error // err will be nil on Read io.EOF
+       earlyCloseFn func() error      // optional alt Close func used if io.EOF not seen
 }
 
 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
@@ -1213,7 +1238,7 @@ func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
                if es.rerr == nil {
                        es.rerr = err
                }
-               es.condfn(err)
+               err = es.condfn(err)
        }
        return
 }
@@ -1229,20 +1254,17 @@ func (es *bodyEOFSignal) Close() error {
                return es.earlyCloseFn()
        }
        err := es.body.Close()
-       es.condfn(err)
-       return err
+       return es.condfn(err)
 }
 
 // caller must hold es.mu.
-func (es *bodyEOFSignal) condfn(err error) {
+func (es *bodyEOFSignal) condfn(err error) error {
        if es.fn == nil {
-               return
-       }
-       if err == io.EOF {
-               err = nil
+               return err
        }
-       es.fn(err)
+       err = es.fn(err)
        es.fn = nil
+       return err
 }
 
 // gzipReader wraps a response body so it can lazily
index b56defdc078156762701cc8cb76c102c9c85e1b5..e2c926d5004cc9310db21b9fedb1addbec85d6d1 100644 (file)
@@ -505,12 +505,17 @@ func TestStressSurpriseServerCloses(t *testing.T) {
 
        tr := &Transport{DisableKeepAlives: false}
        c := &Client{Transport: tr}
+       defer tr.CloseIdleConnections()
 
        // Do a bunch of traffic from different goroutines. Send to activityc
        // after each request completes, regardless of whether it failed.
+       // If these are too high, OS X exhausts its emphemeral ports
+       // and hangs waiting for them to transition TCP states. That's
+       // not what we want to test.  TODO(bradfitz): use an io.Pipe
+       // dialer for this test instead?
        const (
-               numClients    = 50
-               reqsPerClient = 250
+               numClients    = 20
+               reqsPerClient = 25
        )
        activityc := make(chan bool)
        for i := 0; i < numClients; i++ {
@@ -1371,8 +1376,8 @@ func TestTransportCancelRequest(t *testing.T) {
        body, err := ioutil.ReadAll(res.Body)
        d := time.Since(t0)
 
-       if err == nil {
-               t.Error("expected an error reading the body")
+       if err != ExportErrRequestCanceled {
+               t.Errorf("Body.Read error = %v; want errRequestCanceled", err)
        }
        if string(body) != "Hello" {
                t.Errorf("Body = %q; want Hello", body)
@@ -1382,7 +1387,7 @@ func TestTransportCancelRequest(t *testing.T) {
        }
        // Verify no outstanding requests after readLoop/writeLoop
        // goroutines shut down.
-       for tries := 3; tries > 0; tries-- {
+       for tries := 5; tries > 0; tries-- {
                n := tr.NumPendingRequestsForTesting()
                if n == 0 {
                        break
@@ -1431,6 +1436,7 @@ func TestTransportCancelRequestInDial(t *testing.T) {
 
        eventLog.Printf("canceling")
        tr.CancelRequest(req)
+       tr.CancelRequest(req) // used to panic on second call
 
        select {
        case <-gotres:
@@ -2321,6 +2327,47 @@ func TestTransportResponseCloseRace(t *testing.T) {
        }
 }
 
+// Test for issue 10474
+func TestTransportResponseCancelRace(t *testing.T) {
+       defer afterTest(t)
+
+       ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
+               // important that this response has a body.
+               var b [1024]byte
+               w.Write(b[:])
+       }))
+       defer ts.Close()
+
+       tr := &Transport{}
+       defer tr.CloseIdleConnections()
+
+       req, err := NewRequest("GET", ts.URL, nil)
+       if err != nil {
+               t.Fatal(err)
+       }
+       res, err := tr.RoundTrip(req)
+       if err != nil {
+               t.Fatal(err)
+       }
+       // If we do an early close, Transport just throws the connection away and
+       // doesn't reuse it. In order to trigger the bug, it has to reuse the connection
+       // so read the body
+       if _, err := io.Copy(ioutil.Discard, res.Body); err != nil {
+               t.Fatal(err)
+       }
+
+       req2, err := NewRequest("GET", ts.URL, nil)
+       if err != nil {
+               t.Fatal(err)
+       }
+       tr.CancelRequest(req)
+       res, err = tr.RoundTrip(req2)
+       if err != nil {
+               t.Fatal(err)
+       }
+       res.Body.Close()
+}
+
 func wantBody(res *http.Response, err error, want string) error {
        if err != nil {
                return err