]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: retry idempotent HTTP reqs on dead reused conns
authorBlake Gentry <blakesgentry@gmail.com>
Thu, 22 Jan 2015 23:58:25 +0000 (15:58 -0800)
committerBrad Fitzpatrick <bradfitz@golang.org>
Tue, 1 Dec 2015 20:40:31 +0000 (20:40 +0000)
If we try to reuse a connection that the server is in the process of
closing, we may end up successfully writing out our request (or a
portion of our request) only to find a connection error when we try to
read from (or finish writing to) the socket. This manifests as an EOF
returned from the Transport's RoundTrip.

The issue, among others, is described in #4677.

This change follows some of the Chromium guidelines for retrying
idempotent requests only when the connection has been already been used
successfully and no header data has yet been received for the response.

As part of this change, an unexported error was defined for
errMissingHost, which was previously defined inline. errMissingHost is
the only non-network error returned from a Request's Write() method.

Additionally, this breaks TestLinuxSendfile because its test server
explicitly triggers the type of scenario this change is meant to retry
on. Because that test server stops accepting conns on the test listener
before the retry, the test would time out. To fix this, the test was
altered to use a non-idempotent test type (POST).

Change-Id: I1ca630b944f0ed7ec1d3d46056a50fb959481a16
Reviewed-on: https://go-review.googlesource.com/3210
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>

src/net/http/export_test.go
src/net/http/fs_test.go
src/net/http/request.go
src/net/http/transport.go
src/net/http/transport_test.go

index 6e6d1cd7250e1d13f4d1d1a0f3b9492333798e31..0dc39a359f7629966d2a51c690478e6dafa8483f 100644 (file)
@@ -125,6 +125,11 @@ func SetPendingDialHooks(before, after func()) {
        prePendingDial, postPendingDial = before, after
 }
 
+// SetRetriedHook sets the hook that runs when an idempotent retry occurs.
+func SetRetriedHook(hook func()) {
+       retried = hook
+}
+
 var ExportServerNewConn = (*Server).newConn
 
 var ExportCloseWriteAndWait = (*conn).closeWriteAndWait
index abfd39377b8d6a3d5002ffac70d773aa7418bf91..7550c552d18f3e1af149ba11fa4a70eaa51fd9d6 100644 (file)
@@ -946,7 +946,7 @@ func TestLinuxSendfile(t *testing.T) {
        res.Body.Close()
 
        // Force child to exit cleanly.
-       Get(fmt.Sprintf("http://%s/quit", ln.Addr()))
+       Post(fmt.Sprintf("http://%s/quit", ln.Addr()), "", nil)
        child.Wait()
 
        rx := regexp.MustCompile(`sendfile(64)?\(\d+,\s*\d+,\s*NULL,\s*\d+\)\s*=\s*\d+\s*\n`)
index 67976da103f359b7b86f2b356f8c7ed121057eaf..c85713c42c60e6302717ae77bf042424df2d3e0c 100644 (file)
@@ -367,6 +367,10 @@ func (r *Request) WriteProxy(w io.Writer) error {
        return r.write(w, true, nil, nil)
 }
 
+// errMissingHost is returned by Write when there is no Host or URL present in
+// the Request.
+var errMissingHost = errors.New("http: Request.Write on Request with no Host or URL set")
+
 // extraHeaders may be nil
 // waitForContinue may be nil
 func (req *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, waitForContinue func() bool) error {
@@ -377,7 +381,7 @@ func (req *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, wai
        host := cleanHost(req.Host)
        if host == "" {
                if req.URL == nil {
-                       return errors.New("http: Request.Write on Request with no Host or URL set")
+                       return errMissingHost
                }
                host = cleanHost(req.URL.Host)
        }
@@ -1042,3 +1046,11 @@ func (r *Request) closeBody() {
                r.Body.Close()
        }
 }
+
+func (r *Request) isReplayable() bool {
+       return r.Body == nil &&
+               (r.Method == "GET" ||
+                       r.Method == "HEAD" ||
+                       r.Method == "OPTIONS" ||
+                       r.Method == "TRACE")
+}
index 63abd377e97049a4e25ad04f63879e3e09adbcad..5ba072007f850c98ff30b15376027f3c9a64df26 100644 (file)
@@ -244,28 +244,79 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) {
                req.closeBody()
                return nil, errors.New("http: no Host in request URL")
        }
-       treq := &transportRequest{Request: req}
-       cm, err := t.connectMethodForRequest(treq)
-       if err != nil {
-               req.closeBody()
-               return nil, err
+
+       for {
+               // treq gets modified by roundTrip, so we need to recreate for each retry.
+               treq := &transportRequest{Request: req}
+               cm, err := t.connectMethodForRequest(treq)
+               if err != nil {
+                       req.closeBody()
+                       return nil, err
+               }
+
+               // Get the cached or newly-created connection to either the
+               // host (for http or https), the http proxy, or the http proxy
+               // pre-CONNECTed to https server.  In any case, we'll be ready
+               // to send it requests.
+               pconn, err := t.getConn(req, cm)
+               if err != nil {
+                       t.setReqCanceler(req, nil)
+                       req.closeBody()
+                       return nil, err
+               }
+
+               var resp *Response
+               if pconn.alt != nil {
+                       // HTTP/2 path.
+                       resp, err = pconn.alt.RoundTrip(req)
+               } else {
+                       resp, err = pconn.roundTrip(treq)
+               }
+               if err == nil {
+                       return resp, nil
+               }
+               if err := checkTransportResend(err, req, pconn); err != nil {
+                       return nil, err
+               }
+               if retried != nil {
+                       retried()
+               }
        }
+}
 
-       // Get the cached or newly-created connection to either the
-       // host (for http or https), the http proxy, or the http proxy
-       // pre-CONNECTed to https server.  In any case, we'll be ready
-       // to send it requests.
-       pconn, err := t.getConn(req, cm)
-       if err != nil {
-               t.setReqCanceler(req, nil)
-               req.closeBody()
-               return nil, err
+// checkTransportResend checks whether a failed HTTP request can be
+// resent on a new connection. The non-nil input error is the error from
+// roundTrip, which might be wrapped in a beforeRespHeaderError error.
+//
+// The return value is err or the unwrapped error inside a
+// beforeRespHeaderError.
+func checkTransportResend(err error, req *Request, pconn *persistConn) error {
+       brhErr, ok := err.(beforeRespHeaderError)
+       if !ok {
+               return err
        }
-       if pconn.alt != nil {
-               // HTTP/2 path.
-               return pconn.alt.RoundTrip(req)
+       err = brhErr.error // unwrap the custom error in case we return it
+       if err != errMissingHost && pconn.isReused() && req.isReplayable() {
+               // If we try to reuse a connection that the server is in the process of
+               // closing, we may end up successfully writing out our request (or a
+               // portion of our request) only to find a connection error when we try to
+               // read from (or finish writing to) the socket.
+
+               // There can be a race between the socket pool checking whether a socket
+               // is still connected, receiving the FIN, and sending/reading data on a
+               // reused socket. If we receive the FIN between the connectedness check
+               // and writing/reading from the socket, we may first learn the socket is
+               // disconnected when we get a ERR_SOCKET_NOT_CONNECTED. This will most
+               // likely happen when trying to retrieve its IP address. See
+               // http://crbug.com/105824 for more details.
+
+               // We resend a request only if we reused a keep-alive connection and did
+               // not yet receive any header data. This automatically prevents an
+               // infinite resend loop because we'll run out of the cached keep-alive
+               // connections eventually.
+               return nil
        }
-       return pconn.roundTrip(treq)
+       return err
 }
 
 // ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol.
@@ -408,6 +459,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
        if max == 0 {
                max = DefaultMaxIdleConnsPerHost
        }
+       pconn.markReused()
        t.idleMu.Lock()
 
        waitingDialer := t.idleConnCh[key]
@@ -539,7 +591,7 @@ func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
 }
 
 // Testing hooks:
-var prePendingDial, postPendingDial func()
+var prePendingDial, postPendingDial, retried func()
 
 // getConn dials and creates a new persistConn to the target as
 // specified in the connectMethod.  This includes doing a proxy CONNECT
@@ -879,6 +931,7 @@ type persistConn struct {
        closed               bool // whether conn has been closed
        broken               bool // an error has happened on this connection; marked broken so it's not reused.
        canceled             bool // whether this conn was broken due a CancelRequest
+       reused               bool // whether conn has had successful request/response and is being reused.
        // mutateHeaderFunc is an optional func to modify extra
        // headers on each outbound request before it's written. (the
        // original Request given to RoundTrip is not modified)
@@ -900,6 +953,14 @@ func (pc *persistConn) isCanceled() bool {
        return pc.canceled
 }
 
+// isReused reports whether this connection is in a known broken state.
+func (pc *persistConn) isReused() bool {
+       pc.lk.Lock()
+       r := pc.reused
+       pc.lk.Unlock()
+       return r
+}
+
 func (pc *persistConn) cancelRequest() {
        pc.lk.Lock()
        defer pc.lk.Unlock()
@@ -922,6 +983,9 @@ func (pc *persistConn) readLoop() {
        alive := true
        for alive {
                pb, err := pc.br.Peek(1)
+               if err != nil {
+                       err = beforeRespHeaderError{err}
+               }
 
                pc.lk.Lock()
                if pc.numExpectedResponses == 0 {
@@ -1004,7 +1068,13 @@ func (pc *persistConn) readLoop() {
                        // on t from this persistConn while the Transport
                        // potentially spins up a different persistConn for the
                        // caller's subsequent request.
-                       pc.t.setReqCanceler(rc.req, nil)
+                       //
+                       // If this request will be retried, don't clear the reqCanceler
+                       // yet or else roundTrip thinks it's been canceled.
+                       if err == nil ||
+                               checkTransportResend(err, rc.req, pc) != nil {
+                               pc.t.setReqCanceler(rc.req, nil)
+                       }
                }
 
                pc.lk.Lock()
@@ -1186,6 +1256,12 @@ var (
        testHookReadLoopBeforeNextRead  func()
 )
 
+// beforeRespHeaderError is used to indicate when an IO error has occurred before
+// any header data was received.
+type beforeRespHeaderError struct {
+       error
+}
+
 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
        if hook := testHookEnterRoundTrip; hook != nil {
                hook()
@@ -1267,7 +1343,7 @@ WaitResponse:
                                }
                        }
                        if err != nil {
-                               re = responseAndError{nil, err}
+                               re = responseAndError{nil, beforeRespHeaderError{err}}
                                pc.close()
                                break WaitResponse
                        }
@@ -1293,7 +1369,7 @@ WaitResponse:
                                        fn()
                                }
                        default:
-                               re = responseAndError{err: errClosed}
+                               re = responseAndError{err: beforeRespHeaderError{errClosed}}
                                if pc.isCanceled() {
                                        re = responseAndError{err: errRequestCanceled}
                                }
@@ -1326,6 +1402,14 @@ func (pc *persistConn) markBroken() {
        pc.broken = true
 }
 
+// markReused marks this connection as having been successfully used for a
+// request and response.
+func (pc *persistConn) markReused() {
+       pc.lk.Lock()
+       pc.reused = true
+       pc.lk.Unlock()
+}
+
 func (pc *persistConn) close() {
        pc.lk.Lock()
        defer pc.lk.Unlock()
index 17cac856970c1b68732d8ecb0f6f1d94612a2032..eaed3a484d1b3c126c300a4ca7b3352020c3c07d 100644 (file)
@@ -2410,6 +2410,80 @@ type closerFunc func() error
 
 func (f closerFunc) Close() error { return f() }
 
+// Issue 4677. If we try to reuse a connection that the server is in the
+// process of closing, we may end up successfully writing out our request (or a
+// portion of our request) only to find a connection error when we try to read
+// from (or finish writing to) the socket.
+//
+// NOTE: we resend a request only if the request is idempotent, we reused a
+// keep-alive connection, and we haven't yet received any header data.  This
+// automatically prevents an infinite resend loop because we'll run out of the
+// cached keep-alive connections eventually.
+func TestRetryIdempotentRequestsOnError(t *testing.T) {
+       defer afterTest(t)
+
+       ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
+       }))
+       defer ts.Close()
+
+       tr := &Transport{}
+       c := &Client{Transport: tr}
+
+       const N = 2
+       retryc := make(chan struct{}, N)
+       SetRetriedHook(func() {
+               retryc <- struct{}{}
+       })
+       defer SetRetriedHook(nil)
+
+       for n := 0; n < 100; n++ {
+               // open 2 conns
+               errc := make(chan error, N)
+               for i := 0; i < N; i++ {
+                       // start goroutines, send on errc
+                       go func() {
+                               res, err := c.Get(ts.URL)
+                               if err == nil {
+                                       res.Body.Close()
+                               }
+                               errc <- err
+                       }()
+               }
+               for i := 0; i < N; i++ {
+                       if err := <-errc; err != nil {
+                               t.Fatal(err)
+                       }
+               }
+
+               ts.CloseClientConnections()
+               for i := 0; i < N; i++ {
+                       go func() {
+                               res, err := c.Get(ts.URL)
+                               if err == nil {
+                                       res.Body.Close()
+                               }
+                               errc <- err
+                       }()
+               }
+
+               for i := 0; i < N; i++ {
+                       if err := <-errc; err != nil {
+                               t.Fatal(err)
+                       }
+               }
+               for i := 0; i < N; i++ {
+                       select {
+                       case <-retryc:
+                               // we triggered a retry, test was successful
+                               t.Logf("finished after %d runs\n", n)
+                               return
+                       default:
+                       }
+               }
+       }
+       t.Fatal("did not trigger any retries")
+}
+
 // Issue 6981
 func TestTransportClosesBodyOnError(t *testing.T) {
        defer afterTest(t)