]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: tighten protocol between Transport.roundTrip and persistConn.readLoop
authorBrad Fitzpatrick <bradfitz@golang.org>
Tue, 5 Jan 2016 19:40:25 +0000 (19:40 +0000)
committerBrad Fitzpatrick <bradfitz@golang.org>
Tue, 5 Jan 2016 22:33:16 +0000 (22:33 +0000)
In debugging the flaky test in #13825, I discovered that my previous
change to tighten and simplify the communication protocol between
Transport.roundTrip and persistConn.readLoop in
https://golang.org/cl/17890 wasn't complete.

This change simplifies it further: the buffered-vs-unbuffered
complexity goes away, and we no longer need to re-try channel reads in
the select case. It was trying to prioritize channels in the case that
two were readable in the select. (it was only failing in the race builder
because the race builds randomize select scheduling)

The problem was that in the bodyless response case we had to return
the idle connection before replying to roundTrip. But putIdleConn
previously both added it to the free list (which we wanted), but also
closed the connection, which made the caller goroutine
(Transport.roundTrip) have two readable cases: pc.closech, and the
response. We guarded against similar conditions in the caller's select
for two readable channels, but such a fix wasn't possible here, and would
be overly complicated.

Instead, switch to unbuffered channels. The unbuffered channels were only
to prevent goroutine leaks, so address that differently: add a "callerGone"
channel closed by the caller on exit, and select on that during any unbuffered
sends.

As part of the fix, split putIdleConn into two halves: a part that
just returns to the freelist, and a part that also closes. Update the
four callers to the variants each wanted.

Incidentally, the connections were closing on return to the pool due
to MaxIdleConnsPerHost (somewhat related: #13801), but this bug
could've manifested for plenty of other reasons.

Fixes #13825

Change-Id: I6fa7136e2c52909d57a22ea4b74d0155fdf0e6fa
Reviewed-on: https://go-review.googlesource.com/18282
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Andrew Gerrand <adg@golang.org>
src/net/http/export_test.go
src/net/http/transport.go

index f3939db27d09e2380fa61363711bdae7f2ef607b..514d02b2a3a4fae5974802b80e8c1ce9d41233cf 100644 (file)
@@ -121,12 +121,12 @@ func (t *Transport) RequestIdleConnChForTesting() {
 
 func (t *Transport) PutIdleTestConn() bool {
        c, _ := net.Pipe()
-       return t.putIdleConn(&persistConn{
+       return t.tryPutIdleConn(&persistConn{
                t:        t,
                conn:     c,                   // dummy
                closech:  make(chan struct{}), // so it can be closed
                cacheKey: connectMethodKey{"", "http", "example.com"},
-       })
+       }) == nil
 }
 
 // All test hooks must be non-nil so they can be called directly,
index e610f9706be75909f41a3519a97fb40e72946c17..67b291504115b2f7002e8ccf8d437261400fb4aa 100644 (file)
@@ -365,7 +365,7 @@ func (t *Transport) CloseIdleConnections() {
        t.idleMu.Unlock()
        for _, conns := range m {
                for _, pconn := range conns {
-                       pconn.close()
+                       pconn.close(errCloseIdleConns)
                }
        }
 }
@@ -450,17 +450,34 @@ func (cm *connectMethod) proxyAuth() string {
        return ""
 }
 
-// putIdleConn adds pconn to the list of idle persistent connections awaiting
+// error values for debugging and testing, not seen by users.
+var (
+       errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
+       errConnBroken         = errors.New("http: putIdleConn: connection is in bad state")
+       errWantIdle           = errors.New("http: putIdleConn: CloseIdleConnections was called")
+       errTooManyIdle        = errors.New("http: putIdleConn: too many idle connections")
+       errCloseIdleConns     = errors.New("http: CloseIdleConnections called")
+       errReadLoopExiting    = errors.New("http: persistConn.readLoop exiting")
+       errServerClosedIdle   = errors.New("http: server closed idle conn")
+)
+
+func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
+       if err := t.tryPutIdleConn(pconn); err != nil {
+               pconn.close(err)
+       }
+}
+
+// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
 // a new request.
-// If pconn is no longer needed or not in a good state, putIdleConn
-// returns false.
-func (t *Transport) putIdleConn(pconn *persistConn) bool {
+// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
+// an error explaining why it wasn't registered.
+// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
+func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
        if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
-               pconn.close()
-               return false
+               return errKeepAlivesDisabled
        }
        if pconn.isBroken() {
-               return false
+               return errConnBroken
        }
        key := pconn.cacheKey
        max := t.MaxIdleConnsPerHost
@@ -479,7 +496,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
                // first). Chrome calls this socket late binding.  See
                // https://insouciant.org/tech/connection-management-in-chromium/
                t.idleMu.Unlock()
-               return true
+               return nil
        default:
                if waitingDialer != nil {
                        // They had populated this, but their dial won
@@ -489,16 +506,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
        }
        if t.wantIdle {
                t.idleMu.Unlock()
-               pconn.close()
-               return false
+               return errWantIdle
        }
        if t.idleConn == nil {
                t.idleConn = make(map[connectMethodKey][]*persistConn)
        }
        if len(t.idleConn[key]) >= max {
                t.idleMu.Unlock()
-               pconn.close()
-               return false
+               return errTooManyIdle
        }
        for _, exist := range t.idleConn[key] {
                if exist == pconn {
@@ -507,7 +522,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
        }
        t.idleConn[key] = append(t.idleConn[key], pconn)
        t.idleMu.Unlock()
-       return true
+       return nil
 }
 
 // getIdleConnCh returns a channel to receive and return idle
@@ -626,7 +641,7 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error
                testHookPrePendingDial()
                go func() {
                        if v := <-dialc; v.err == nil {
-                               t.putIdleConn(v.pc)
+                               t.putOrCloseIdleConn(v.pc)
                        }
                        testHookPostPendingDial()
                }()
@@ -929,10 +944,10 @@ type persistConn struct {
 
        lk                   sync.Mutex // guards following fields
        numExpectedResponses int
-       closed               bool // whether conn has been closed
-       broken               bool // an error has happened on this connection; marked broken so it's not reused.
-       canceled             bool // whether this conn was broken due a CancelRequest
-       reused               bool // whether conn has had successful request/response and is being reused.
+       closed               error // set non-nil when conn is closed, before closech is closed
+       broken               bool  // an error has happened on this connection; marked broken so it's not reused.
+       canceled             bool  // whether this conn was broken due a CancelRequest
+       reused               bool  // whether conn has had successful request/response and is being reused.
        // mutateHeaderFunc is an optional func to modify extra
        // headers on each outbound request before it's written. (the
        // original Request given to RoundTrip is not modified)
@@ -966,11 +981,20 @@ func (pc *persistConn) cancelRequest() {
        pc.lk.Lock()
        defer pc.lk.Unlock()
        pc.canceled = true
-       pc.closeLocked()
+       pc.closeLocked(errRequestCanceled)
 }
 
 func (pc *persistConn) readLoop() {
-       defer pc.close()
+       closeErr := errReadLoopExiting // default value, if not changed below
+       defer func() { pc.close(closeErr) }()
+
+       tryPutIdleConn := func() bool {
+               if err := pc.t.tryPutIdleConn(pc); err != nil {
+                       closeErr = err
+                       return false
+               }
+               return true
+       }
 
        // eofc is used to block caller goroutines reading from Response.Body
        // at EOF until this goroutines has (potentially) added the connection
@@ -1016,7 +1040,11 @@ func (pc *persistConn) readLoop() {
                        if checkTransportResend(err, rc.req, pc) != nil {
                                pc.t.setReqCanceler(rc.req, nil)
                        }
-                       rc.ch <- responseAndError{err: err}
+                       select {
+                       case rc.ch <- responseAndError{err: err}:
+                       case <-rc.callerGone:
+                               return
+                       }
                        return
                }
 
@@ -1035,8 +1063,6 @@ func (pc *persistConn) readLoop() {
 
                if !hasBody {
                        pc.t.setReqCanceler(rc.req, nil)
-                       resc := make(chan *Response)        // unbuffered matters; see below
-                       rc.ch <- responseAndError{ch: resc} // buffered send
 
                        // Put the idle conn back into the pool before we send the response
                        // so if they process it quickly and make another request, they'll
@@ -1047,9 +1073,13 @@ func (pc *persistConn) readLoop() {
                        alive = alive &&
                                !pc.sawEOF &&
                                pc.wroteRequest() &&
-                               pc.t.putIdleConn(pc)
+                               tryPutIdleConn()
 
-                       resc <- resp // unbuffered send
+                       select {
+                       case rc.ch <- responseAndError{res: resp}:
+                       case <-rc.callerGone:
+                               return
+                       }
 
                        // Now that they've read from the unbuffered channel, they're safely
                        // out of the select that also waits on this goroutine to die, so
@@ -1079,7 +1109,11 @@ func (pc *persistConn) readLoop() {
                        return err
                }
 
-               rc.ch <- responseAndError{r: resp}
+               select {
+               case rc.ch <- responseAndError{res: resp}:
+               case <-rc.callerGone:
+                       return
+               }
 
                // Before looping back to the top of this function and peeking on
                // the bufio.Reader, wait for the caller goroutine to finish
@@ -1091,7 +1125,7 @@ func (pc *persistConn) readLoop() {
                                bodyEOF &&
                                !pc.sawEOF &&
                                pc.wroteRequest() &&
-                               pc.t.putIdleConn(pc)
+                               tryPutIdleConn()
                        if bodyEOF {
                                eofc <- struct{}{}
                        }
@@ -1116,14 +1150,19 @@ func maybeUngzipResponse(resp *Response) {
 }
 
 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
-       if pc.closed {
+       if pc.closed != nil {
                return
        }
        if n := pc.br.Buffered(); n > 0 {
                buf, _ := pc.br.Peek(n)
                log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
        }
-       pc.closeLocked()
+       if peekErr == io.EOF {
+               // common case.
+               pc.closeLocked(errServerClosedIdle)
+       } else {
+               pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
+       }
 }
 
 // readResponse reads an HTTP response (or two, in the case of "Expect:
@@ -1227,25 +1266,13 @@ func (pc *persistConn) wroteRequest() bool {
 // responseAndError is how the goroutine reading from an HTTP/1 server
 // communicates with the goroutine doing the RoundTrip.
 type responseAndError struct {
-       ch  chan *Response // if non-nil, res should be read from here
-       r   *Response      // else use this response (see res method)
+       res *Response // else use this response (see res method)
        err error
 }
 
-func (re responseAndError) res() *Response {
-       switch {
-       case re.err != nil:
-               return nil
-       case re.ch != nil:
-               return <-re.ch
-       default:
-               return re.r
-       }
-}
-
 type requestAndChan struct {
        req *Request
-       ch  chan responseAndError
+       ch  chan responseAndError // unbuffered; always send in select on callerGone
 
        // did the Transport (as opposed to the client code) add an
        // Accept-Encoding gzip header? only if it we set it do
@@ -1257,6 +1284,8 @@ type requestAndChan struct {
        // the server responds 100 Continue, readLoop send a value
        // to writeLoop via this chan.
        continueCh chan<- struct{}
+
+       callerGone <-chan struct{} // closed when roundTrip caller has returned
 }
 
 // A writeRequest is sent by the readLoop's goroutine to the
@@ -1283,7 +1312,7 @@ func (e *httpError) Timeout() bool   { return e.timeout }
 func (e *httpError) Temporary() bool { return true }
 
 var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
-var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
+var errClosed error = &httpError{err: "net/http: server closed connection before response was received"}
 var errRequestCanceled = errors.New("net/http: request canceled")
 
 func nop() {}
@@ -1309,7 +1338,7 @@ type beforeRespHeaderError struct {
 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
        testHookEnterRoundTrip()
        if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
-               pc.t.putIdleConn(pc)
+               pc.t.putOrCloseIdleConn(pc)
                return nil, errRequestCanceled
        }
        pc.lk.Lock()
@@ -1355,14 +1384,23 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
                req.extraHeaders().Set("Connection", "close")
        }
 
+       gone := make(chan struct{})
+       defer close(gone)
+
        // Write the request concurrently with waiting for a response,
        // in case the server decides to reply before reading our full
        // request body.
        writeErrCh := make(chan error, 1)
        pc.writech <- writeRequest{req, writeErrCh, continueCh}
 
-       resc := make(chan responseAndError, 1)
-       pc.reqch <- requestAndChan{req.Request, resc, requestedGzip, continueCh}
+       resc := make(chan responseAndError)
+       pc.reqch <- requestAndChan{
+               req:        req.Request,
+               ch:         resc,
+               addedGzip:  requestedGzip,
+               continueCh: continueCh,
+               callerGone: gone,
+       }
 
        var re responseAndError
        var respHeaderTimer <-chan time.Time
@@ -1372,22 +1410,9 @@ WaitResponse:
                testHookWaitResLoop()
                select {
                case err := <-writeErrCh:
-                       if isNetWriteError(err) {
-                               // Issue 11745. If we failed to write the request
-                               // body, it's possible the server just heard enough
-                               // and already wrote to us. Prioritize the server's
-                               // response over returning a body write error.
-                               select {
-                               case re = <-resc:
-                                       pc.close()
-                                       break WaitResponse
-                               case <-time.After(50 * time.Millisecond):
-                                       // Fall through.
-                               }
-                       }
                        if err != nil {
                                re = responseAndError{err: beforeRespHeaderError{err}}
-                               pc.close()
+                               pc.close(fmt.Errorf("write error: %v", err))
                                break WaitResponse
                        }
                        if d := pc.t.ResponseHeaderTimeout; d > 0 {
@@ -1400,12 +1425,12 @@ WaitResponse:
                        if pc.isCanceled() {
                                err = errRequestCanceled
                        } else {
-                               err = beforeRespHeaderError{errClosed}
+                               err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)}
                        }
                        re = responseAndError{err: err}
                        break WaitResponse
                case <-respHeaderTimer:
-                       pc.close()
+                       pc.close(errTimeout)
                        re = responseAndError{err: errTimeout}
                        break WaitResponse
                case re = <-resc:
@@ -1419,7 +1444,10 @@ WaitResponse:
        if re.err != nil {
                pc.t.setReqCanceler(req.Request, nil)
        }
-       return re.res(), re.err
+       if (re.res == nil) == (re.err == nil) {
+               panic("internal error: exactly one of res or err should be set")
+       }
+       return re.res, re.err
 }
 
 // markBroken marks a connection as broken (so it's not reused).
@@ -1439,17 +1467,25 @@ func (pc *persistConn) markReused() {
        pc.lk.Unlock()
 }
 
-func (pc *persistConn) close() {
+// close closes the underlying TCP connection and closes
+// the pc.closech channel.
+//
+// The provided err is only for testing and debugging; in normal
+// circumstances it should never be seen by users.
+func (pc *persistConn) close(err error) {
        pc.lk.Lock()
        defer pc.lk.Unlock()
-       pc.closeLocked()
+       pc.closeLocked(err)
 }
 
-func (pc *persistConn) closeLocked() {
+func (pc *persistConn) closeLocked(err error) {
+       if err == nil {
+               panic("nil error")
+       }
        pc.broken = true
-       if !pc.closed {
+       if pc.closed == nil {
                pc.conn.Close()
-               pc.closed = true
+               pc.closed = err
                close(pc.closech)
        }
        pc.mutateHeaderFunc = nil