From: Brad Fitzpatrick Date: Tue, 7 Apr 2015 13:50:34 +0000 (+0200) Subject: net/http: fix Transport data race, double cancel panic, cancel error message X-Git-Tag: go1.5beta1~1007 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=b016eba489820b9091e4f39610a84f697d8eb0f9;p=gostls13.git net/http: fix Transport data race, double cancel panic, cancel error message Fixes #9496 Fixes #9946 Fixes #10474 Fixes #10405 Change-Id: I4e65f1706e46499811d9ebf4ad6d83a5dfb2ddaa Reviewed-on: https://go-review.googlesource.com/8550 Reviewed-by: Daniel Morsing Run-TryBot: Brad Fitzpatrick --- diff --git a/src/net/http/client_test.go b/src/net/http/client_test.go index 18645ff00d..dc499a90b6 100644 --- a/src/net/http/client_test.go +++ b/src/net/http/client_test.go @@ -334,6 +334,7 @@ var echoCookiesRedirectHandler = HandlerFunc(func(w ResponseWriter, r *Request) }) func TestClientSendsCookieFromJar(t *testing.T) { + defer afterTest(t) tr := &recordingTransport{} client := &Client{Transport: tr} client.Jar = &TestJar{perURL: make(map[string][]*Cookie)} diff --git a/src/net/http/export_test.go b/src/net/http/export_test.go index e0bbc80670..69757bdca6 100644 --- a/src/net/http/export_test.go +++ b/src/net/http/export_test.go @@ -110,3 +110,5 @@ func SetPendingDialHooks(before, after func()) { var ExportServerNewConn = (*Server).newConn var ExportCloseWriteAndWait = (*conn).closeWriteAndWait + +var ExportErrRequestCanceled = errRequestCanceled diff --git a/src/net/http/main_test.go b/src/net/http/main_test.go index c7407df707..12eea6f0e1 100644 --- a/src/net/http/main_test.go +++ b/src/net/http/main_test.go @@ -56,17 +56,21 @@ func goroutineLeaked() bool { // not counting goroutines for leakage in -short mode return false } - gs := interestingGoroutines() - n := 0 - stackCount := make(map[string]int) - for _, g := range gs { - stackCount[g]++ - n++ - } - - if n == 0 { - return false + var stackCount map[string]int + for i := 0; i < 5; i++ { + n := 0 + stackCount = make(map[string]int) + gs := interestingGoroutines() + for _, g := range gs { + stackCount[g]++ + n++ + } + if n == 0 { + return false + } + // Wait for goroutines to schedule and die off: + time.Sleep(100 * time.Millisecond) } fmt.Fprintf(os.Stderr, "Too many goroutines running after net/http test(s).\n") for stack, count := range stackCount { diff --git a/src/net/http/npn_test.go b/src/net/http/npn_test.go index 98b8930d06..e2e911d3dd 100644 --- a/src/net/http/npn_test.go +++ b/src/net/http/npn_test.go @@ -6,6 +6,7 @@ package http_test import ( "bufio" + "bytes" "crypto/tls" "fmt" "io" @@ -17,6 +18,7 @@ import ( ) func TestNextProtoUpgrade(t *testing.T) { + defer afterTest(t) ts := httptest.NewUnstartedServer(HandlerFunc(func(w ResponseWriter, r *Request) { fmt.Fprintf(w, "path=%s,proto=", r.URL.Path) if r.TLS != nil { @@ -38,12 +40,12 @@ func TestNextProtoUpgrade(t *testing.T) { ts.StartTLS() defer ts.Close() - tr := newTLSTransport(t, ts) - defer tr.CloseIdleConnections() - c := &Client{Transport: tr} - // Normal request, without NPN. { + tr := newTLSTransport(t, ts) + defer tr.CloseIdleConnections() + c := &Client{Transport: tr} + res, err := c.Get(ts.URL) if err != nil { t.Fatal(err) @@ -60,11 +62,17 @@ func TestNextProtoUpgrade(t *testing.T) { // Request to an advertised but unhandled NPN protocol. // Server will hang up. { - tr.CloseIdleConnections() + tr := newTLSTransport(t, ts) tr.TLSClientConfig.NextProtos = []string{"unhandled-proto"} - _, err := c.Get(ts.URL) + defer tr.CloseIdleConnections() + c := &Client{Transport: tr} + + res, err := c.Get(ts.URL) if err == nil { - t.Errorf("expected error on unhandled-proto request") + defer res.Body.Close() + var buf bytes.Buffer + res.Write(&buf) + t.Errorf("expected error on unhandled-proto request; got: %s", buf.Bytes()) } } diff --git a/src/net/http/request_test.go b/src/net/http/request_test.go index 9228d50ef7..a518b00449 100644 --- a/src/net/http/request_test.go +++ b/src/net/http/request_test.go @@ -178,6 +178,7 @@ func TestParseMultipartForm(t *testing.T) { } func TestRedirect(t *testing.T) { + defer afterTest(t) ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { switch r.URL.Path { case "/": diff --git a/src/net/http/serve_test.go b/src/net/http/serve_test.go index c21b57b57e..6e1b3ed025 100644 --- a/src/net/http/serve_test.go +++ b/src/net/http/serve_test.go @@ -146,6 +146,7 @@ func (ht handlerTest) rawResponse(req string) string { } func TestConsumingBodyOnNextConn(t *testing.T) { + defer afterTest(t) conn := new(testConn) for i := 0; i < 2; i++ { conn.readBuf.Write([]byte( diff --git a/src/net/http/transport.go b/src/net/http/transport.go index 2528b8e1cd..79a418765b 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -279,6 +279,7 @@ func (t *Transport) CloseIdleConnections() { func (t *Transport) CancelRequest(req *Request) { t.reqMu.Lock() cancel := t.reqCanceler[req] + delete(t.reqCanceler, req) t.reqMu.Unlock() if cancel != nil { cancel() @@ -805,6 +806,7 @@ type persistConn struct { numExpectedResponses int closed bool // whether conn has been closed broken bool // an error has happened on this connection; marked broken so it's not reused. + canceled bool // whether this conn was broken due a CancelRequest // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) @@ -819,8 +821,18 @@ func (pc *persistConn) isBroken() bool { return b } +// isCanceled reports whether this connection was closed due to CancelRequest. +func (pc *persistConn) isCanceled() bool { + pc.lk.Lock() + defer pc.lk.Unlock() + return pc.canceled +} + func (pc *persistConn) cancelRequest() { - pc.conn.Close() + pc.lk.Lock() + defer pc.lk.Unlock() + pc.canceled = true + pc.closeLocked() } var remoteSideClosedFunc func(error) bool // or nil to use default @@ -836,8 +848,13 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { - alive := true + // eofc is used to block http.Handler goroutines reading from Response.Body + // at EOF until this goroutines has (potentially) added the connection + // back to the idle pool. + eofc := make(chan struct{}) + defer close(eofc) // unblock reader on errors + alive := true for alive { pb, err := pc.br.Peek(1) @@ -895,22 +912,22 @@ func (pc *persistConn) readLoop() { alive = false } - var waitForBodyRead chan bool + var waitForBodyRead chan bool // channel is nil when there's no body if hasBody { waitForBodyRead = make(chan bool, 2) resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { - // Sending false here sets alive to - // false and closes the connection - // below. waitForBodyRead <- false return nil } - resp.Body.(*bodyEOFSignal).fn = func(err error) { - waitForBodyRead <- alive && - err == nil && - !pc.sawEOF && - pc.wroteRequest() && - pc.t.putIdleConn(pc) + resp.Body.(*bodyEOFSignal).fn = func(err error) error { + isEOF := err == io.EOF + waitForBodyRead <- isEOF + if isEOF { + <-eofc // see comment at top + } else if err != nil && pc.isCanceled() { + return errRequestCanceled + } + return err } } @@ -924,28 +941,33 @@ func (pc *persistConn) readLoop() { // on the response channel before erroring out. rc.ch <- responseAndError{resp, err} - if alive && !hasBody { - alive = !pc.sawEOF && - pc.wroteRequest() && - pc.t.putIdleConn(pc) - } - - // Wait for the just-returned response body to be fully consumed - // before we race and peek on the underlying bufio reader. - if waitForBodyRead != nil { + if hasBody { + // To avoid a race, wait for the just-returned + // response body to be fully consumed before peek on + // the underlying bufio reader. select { - case alive = <-waitForBodyRead: + case bodyEOF := <-waitForBodyRead: + pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool + alive = alive && + bodyEOF && + !pc.sawEOF && + pc.wroteRequest() && + pc.t.putIdleConn(pc) + if bodyEOF { + eofc <- struct{}{} + } case <-pc.closech: alive = false } - } - - pc.t.setReqCanceler(rc.req, nil) - - if !alive { - pc.close() + } else { + pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool + alive = alive && + !pc.sawEOF && + pc.wroteRequest() && + pc.t.putIdleConn(pc) } } + pc.close() } func (pc *persistConn) writeLoop() { @@ -1035,6 +1057,7 @@ func (e *httpError) Temporary() bool { return true } var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} var errClosed error = &httpError{err: "net/http: transport closed before response was received"} +var errRequestCanceled = errors.New("net/http: request canceled") var testHookPersistConnClosedGotRes func() // nil except for tests @@ -1183,16 +1206,18 @@ func canonicalAddr(url *url.URL) string { // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most // once, right before its final (error-producing) Read or Close call -// returns. If earlyCloseFn is non-nil and Close is called before -// io.EOF is seen, earlyCloseFn is called instead of fn, and its -// return value is the return value from Close. +// returns. fn should return the new error to return from Read or Close. +// +// If earlyCloseFn is non-nil and Close is called before io.EOF is +// seen, earlyCloseFn is called instead of fn, and its return value is +// the return value from Close. type bodyEOFSignal struct { body io.ReadCloser - mu sync.Mutex // guards following 4 fields - closed bool // whether Close has been called - rerr error // sticky Read error - fn func(error) // error will be nil on Read io.EOF - earlyCloseFn func() error // optional alt Close func used if io.EOF not seen + mu sync.Mutex // guards following 4 fields + closed bool // whether Close has been called + rerr error // sticky Read error + fn func(error) error // err will be nil on Read io.EOF + earlyCloseFn func() error // optional alt Close func used if io.EOF not seen } func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { @@ -1213,7 +1238,7 @@ func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { if es.rerr == nil { es.rerr = err } - es.condfn(err) + err = es.condfn(err) } return } @@ -1229,20 +1254,17 @@ func (es *bodyEOFSignal) Close() error { return es.earlyCloseFn() } err := es.body.Close() - es.condfn(err) - return err + return es.condfn(err) } // caller must hold es.mu. -func (es *bodyEOFSignal) condfn(err error) { +func (es *bodyEOFSignal) condfn(err error) error { if es.fn == nil { - return - } - if err == io.EOF { - err = nil + return err } - es.fn(err) + err = es.fn(err) es.fn = nil + return err } // gzipReader wraps a response body so it can lazily diff --git a/src/net/http/transport_test.go b/src/net/http/transport_test.go index b56defdc07..e2c926d500 100644 --- a/src/net/http/transport_test.go +++ b/src/net/http/transport_test.go @@ -505,12 +505,17 @@ func TestStressSurpriseServerCloses(t *testing.T) { tr := &Transport{DisableKeepAlives: false} c := &Client{Transport: tr} + defer tr.CloseIdleConnections() // Do a bunch of traffic from different goroutines. Send to activityc // after each request completes, regardless of whether it failed. + // If these are too high, OS X exhausts its emphemeral ports + // and hangs waiting for them to transition TCP states. That's + // not what we want to test. TODO(bradfitz): use an io.Pipe + // dialer for this test instead? const ( - numClients = 50 - reqsPerClient = 250 + numClients = 20 + reqsPerClient = 25 ) activityc := make(chan bool) for i := 0; i < numClients; i++ { @@ -1371,8 +1376,8 @@ func TestTransportCancelRequest(t *testing.T) { body, err := ioutil.ReadAll(res.Body) d := time.Since(t0) - if err == nil { - t.Error("expected an error reading the body") + if err != ExportErrRequestCanceled { + t.Errorf("Body.Read error = %v; want errRequestCanceled", err) } if string(body) != "Hello" { t.Errorf("Body = %q; want Hello", body) @@ -1382,7 +1387,7 @@ func TestTransportCancelRequest(t *testing.T) { } // Verify no outstanding requests after readLoop/writeLoop // goroutines shut down. - for tries := 3; tries > 0; tries-- { + for tries := 5; tries > 0; tries-- { n := tr.NumPendingRequestsForTesting() if n == 0 { break @@ -1431,6 +1436,7 @@ func TestTransportCancelRequestInDial(t *testing.T) { eventLog.Printf("canceling") tr.CancelRequest(req) + tr.CancelRequest(req) // used to panic on second call select { case <-gotres: @@ -2321,6 +2327,47 @@ func TestTransportResponseCloseRace(t *testing.T) { } } +// Test for issue 10474 +func TestTransportResponseCancelRace(t *testing.T) { + defer afterTest(t) + + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + // important that this response has a body. + var b [1024]byte + w.Write(b[:]) + })) + defer ts.Close() + + tr := &Transport{} + defer tr.CloseIdleConnections() + + req, err := NewRequest("GET", ts.URL, nil) + if err != nil { + t.Fatal(err) + } + res, err := tr.RoundTrip(req) + if err != nil { + t.Fatal(err) + } + // If we do an early close, Transport just throws the connection away and + // doesn't reuse it. In order to trigger the bug, it has to reuse the connection + // so read the body + if _, err := io.Copy(ioutil.Discard, res.Body); err != nil { + t.Fatal(err) + } + + req2, err := NewRequest("GET", ts.URL, nil) + if err != nil { + t.Fatal(err) + } + tr.CancelRequest(req) + res, err = tr.RoundTrip(req2) + if err != nil { + t.Fatal(err) + } + res.Body.Close() +} + func wantBody(res *http.Response, err error, want string) error { if err != nil { return err