}
func (pc *persistConn) readLoop() {
- // eofc is used to block http.Handler goroutines reading from Response.Body
+ defer pc.close()
+
+ // eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
eofc := make(chan struct{})
alive := true
for alive {
- pb, err := pc.br.Peek(1)
+ _, err := pc.br.Peek(1)
if err != nil {
err = beforeRespHeaderError{err}
}
pc.lk.Lock()
if pc.numExpectedResponses == 0 {
- if !pc.closed {
- pc.closeLocked()
- if len(pb) > 0 {
- buf, _ := pc.br.Peek(pc.br.Buffered())
- log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, err)
- }
- }
+ pc.readLoopPeekFailLocked(err)
pc.lk.Unlock()
return
}
var resp *Response
if err == nil {
- resp, err = ReadResponse(pc.br, rc.req)
- if err == nil {
- if rc.continueCh != nil {
- if resp.StatusCode == 100 {
- rc.continueCh <- struct{}{}
- } else {
- close(rc.continueCh)
- }
- }
- if resp.StatusCode == 100 {
- resp, err = ReadResponse(pc.br, rc.req)
- }
- }
- }
-
- if resp != nil {
- resp.TLS = pc.tlsState
+ resp, err = pc.readResponse(rc)
}
- hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
-
if err != nil {
- pc.close()
- } else {
- if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
- resp.Header.Del("Content-Encoding")
- resp.Header.Del("Content-Length")
- resp.ContentLength = -1
- resp.Body = &gzipReader{body: resp.Body}
+ // If we won't be able to retry this request later (from the
+ // roundTrip goroutine), mark it as done now.
+ // BEFORE the send on rc.ch, as the client might re-use the
+ // same *Request pointer, and we don't want to set call
+ // t.setReqCanceler from this persistConn while the Transport
+ // potentially spins up a different persistConn for the
+ // caller's subsequent request.
+ if checkTransportResend(err, rc.req, pc) != nil {
+ pc.t.setReqCanceler(rc.req, nil)
}
- resp.Body = &bodyEOFSignal{body: resp.Body}
+ rc.ch <- responseAndError{err: err}
+ return
}
- if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
+ pc.lk.Lock()
+ pc.numExpectedResponses--
+ pc.lk.Unlock()
+
+ hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
+
+ if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
- var waitForBodyRead chan bool // channel is nil when there's no body
- if hasBody {
- waitForBodyRead = make(chan bool, 2)
- resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
- waitForBodyRead <- false
- return nil
- }
- resp.Body.(*bodyEOFSignal).fn = func(err error) error {
- isEOF := err == io.EOF
- waitForBodyRead <- isEOF
- if isEOF {
- <-eofc // see comment at top
- } else if err != nil && pc.isCanceled() {
- return errRequestCanceled
- }
- return err
- }
- } else {
- // Before send on rc.ch, as client might re-use the
- // same *Request pointer, and we don't want to set this
- // on t from this persistConn while the Transport
- // potentially spins up a different persistConn for the
- // caller's subsequent request.
- //
- // If this request will be retried, don't clear the reqCanceler
- // yet or else roundTrip thinks it's been canceled.
- if err == nil ||
- checkTransportResend(err, rc.req, pc) != nil {
- pc.t.setReqCanceler(rc.req, nil)
- }
+ if !hasBody {
+ pc.t.setReqCanceler(rc.req, nil)
+ resc := make(chan *Response) // unbuffered matters; see below
+ rc.ch <- responseAndError{ch: resc} // buffered send
+
+ // Put the idle conn back into the pool before we send the response
+ // so if they process it quickly and make another request, they'll
+ // get this same conn. But we use the unbuffered channel 'rc'
+ // to guarantee that persistConn.roundTrip got out of its select
+ // potentially waiting for this persistConn to close.
+ // but after
+ alive = alive &&
+ !pc.sawEOF &&
+ pc.wroteRequest() &&
+ pc.t.putIdleConn(pc)
+
+ resc <- resp // unbuffered send
+
+ // Now that they've read from the unbuffered channel, they're safely
+ // out of the select that also waits on this goroutine to die, so
+ // we're allowed to exit now if needed (if alive is false)
+ testHookReadLoopBeforeNextRead()
+ continue
}
- pc.lk.Lock()
- pc.numExpectedResponses--
- pc.lk.Unlock()
+ if rc.addedGzip {
+ maybeUngzipResponse(resp)
+ }
+ resp.Body = &bodyEOFSignal{body: resp.Body}
- // The connection might be going away when we put the
- // idleConn below. When that happens, we close the response channel to signal
- // to roundTrip that the connection is gone. roundTrip waits for
- // both closing and a response in a select, so it might choose
- // the close channel, rather than the response.
- // We send the response first so that roundTrip can check
- // if there is a pending one with a non-blocking select
- // on the response channel before erroring out.
- rc.ch <- responseAndError{resp, err}
-
- if hasBody {
- // To avoid a race, wait for the just-returned
- // response body to be fully consumed before peek on
- // the underlying bufio reader.
- select {
- case <-rc.req.Cancel:
- alive = false
- pc.t.CancelRequest(rc.req)
- case bodyEOF := <-waitForBodyRead:
- pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
- alive = alive &&
- bodyEOF &&
- !pc.sawEOF &&
- pc.wroteRequest() &&
- pc.t.putIdleConn(pc)
- if bodyEOF {
- eofc <- struct{}{}
- }
- case <-pc.closech:
- alive = false
+ waitForBodyRead := make(chan bool, 2)
+ resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
+ waitForBodyRead <- false
+ return nil
+ }
+ resp.Body.(*bodyEOFSignal).fn = func(err error) error {
+ isEOF := err == io.EOF
+ waitForBodyRead <- isEOF
+ if isEOF {
+ <-eofc // see comment above eofc declaration
+ } else if err != nil && pc.isCanceled() {
+ return errRequestCanceled
}
- } else {
+ return err
+ }
+
+ rc.ch <- responseAndError{r: resp}
+
+ // Before looping back to the top of this function and peeking on
+ // the bufio.Reader, wait for the caller goroutine to finish
+ // reading the response body. (or for cancelation or death)
+ select {
+ case bodyEOF := <-waitForBodyRead:
+ pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
+ bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
pc.t.putIdleConn(pc)
+ if bodyEOF {
+ eofc <- struct{}{}
+ }
+ case <-rc.req.Cancel:
+ alive = false
+ pc.t.CancelRequest(rc.req)
+ case <-pc.closech:
+ alive = false
}
+
testHookReadLoopBeforeNextRead()
}
- pc.close()
+}
+
+func maybeUngzipResponse(resp *Response) {
+ if resp.Header.Get("Content-Encoding") == "gzip" {
+ resp.Header.Del("Content-Encoding")
+ resp.Header.Del("Content-Length")
+ resp.ContentLength = -1
+ resp.Body = &gzipReader{body: resp.Body}
+ }
+}
+
+func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
+ if pc.closed {
+ return
+ }
+ if n := pc.br.Buffered(); n > 0 {
+ buf, _ := pc.br.Peek(n)
+ log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
+ }
+ pc.closeLocked()
+}
+
+// readResponse reads an HTTP response (or two, in the case of "Expect:
+// 100-continue") from the server. It returns the final non-100 one.
+func (pc *persistConn) readResponse(rc requestAndChan) (resp *Response, err error) {
+ resp, err = ReadResponse(pc.br, rc.req)
+ if err != nil {
+ return
+ }
+ if rc.continueCh != nil {
+ if resp.StatusCode == 100 {
+ rc.continueCh <- struct{}{}
+ } else {
+ close(rc.continueCh)
+ }
+ }
+ if resp.StatusCode == 100 {
+ resp, err = ReadResponse(pc.br, rc.req)
+ if err != nil {
+ return
+ }
+ }
+ resp.TLS = pc.tlsState
+ return
}
// waitForContinue returns the function to block until
}
}
+// responseAndError is how the goroutine reading from an HTTP/1 server
+// communicates with the goroutine doing the RoundTrip.
type responseAndError struct {
- res *Response
+ ch chan *Response // if non-nil, res should be read from here
+ r *Response // else use this response (see res method)
err error
}
+func (re responseAndError) res() *Response {
+ switch {
+ case re.err != nil:
+ return nil
+ case re.ch != nil:
+ return <-re.ch
+ default:
+ return re.r
+ }
+}
+
type requestAndChan struct {
req *Request
ch chan responseAndError
// testHooks. Always non-nil.
var (
- testHookPersistConnClosedGotRes = nop
- testHookEnterRoundTrip = nop
- testHookWaitResLoop = nop
- testHookRoundTripRetried = nop
- testHookPrePendingDial = nop
- testHookPostPendingDial = nop
+ testHookEnterRoundTrip = nop
+ testHookWaitResLoop = nop
+ testHookRoundTripRetried = nop
+ testHookPrePendingDial = nop
+ testHookPostPendingDial = nop
testHookMu sync.Locker = fakeLocker{} // guards following
testHookReadLoopBeforeNextRead = nop
}
}
if err != nil {
- re = responseAndError{nil, beforeRespHeaderError{err}}
+ re = responseAndError{err: beforeRespHeaderError{err}}
pc.close()
break WaitResponse
}
respHeaderTimer = timer.C
}
case <-pc.closech:
- // The persist connection is dead. This shouldn't
- // usually happen (only with Connection: close responses
- // with no response bodies), but if it does happen it
- // means either a) the remote server hung up on us
- // prematurely, or b) the readLoop sent us a response &
- // closed its closech at roughly the same time, and we
- // selected this case first. If we got a response, readLoop makes sure
- // to send it before it puts the conn and closes the channel.
- // That way, we can fetch the response, if there is one,
- // with a non-blocking receive.
- select {
- case re = <-resc:
- testHookPersistConnClosedGotRes()
- default:
- re = responseAndError{err: beforeRespHeaderError{errClosed}}
- if pc.isCanceled() {
- re = responseAndError{err: errRequestCanceled}
- }
+ var err error
+ if pc.isCanceled() {
+ err = errRequestCanceled
+ } else {
+ err = beforeRespHeaderError{errClosed}
}
+ re = responseAndError{err: err}
break WaitResponse
case <-respHeaderTimer:
pc.close()
if re.err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
- return re.res, re.err
+ return re.res(), re.err
}
// markBroken marks a connection as broken (so it's not reused).
tr := &Transport{DisableKeepAlives: false}
c := &Client{Transport: tr}
+ defer tr.CloseIdleConnections()
// Ensure that we wait for the readLoop to complete before
// calling Head again
res.Body.Close()
}
-// Previously, we used to handle a logical race within RoundTrip by waiting for 100ms
-// in the case of an error. Changing the order of the channel operations got rid of this
-// race.
-//
-// In order to test that the channel op reordering works, we install a hook into the
-// roundTrip function which gets called if we saw the connection go away and
-// we subsequently received a response.
-func TestTransportResponseCloseRace(t *testing.T) {
- if testing.Short() {
- t.Skip("skipping in short mode")
- }
- defer afterTest(t)
-
- ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
- }))
- defer ts.Close()
- sawRace := false
- SetInstallConnClosedHook(func() {
- sawRace = true
- })
- defer SetInstallConnClosedHook(nil)
-
- SetTestHookWaitResLoop(func() {
- // Make the select race much more likely by blocking before
- // the select, so both will be ready by the time the
- // select runs.
- time.Sleep(50 * time.Millisecond)
- })
- defer SetTestHookWaitResLoop(nil)
-
- tr := &Transport{
- DisableKeepAlives: true,
- }
- req, err := NewRequest("GET", ts.URL, nil)
- if err != nil {
- t.Fatal(err)
- }
- // selects are not deterministic, so do this a bunch
- // and see if we handle the logical race at least once.
- for i := 0; i < 10000; i++ {
- resp, err := tr.RoundTrip(req)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- continue
- }
- resp.Body.Close()
- if sawRace {
- t.Logf("saw race after %d iterations", i+1)
- break
- }
- }
- if !sawRace {
- t.Errorf("didn't see response/connection going away race")
- }
-}
-
// Test for issue 10474
func TestTransportResponseCancelRace(t *testing.T) {
defer afterTest(t)
}
}
+// Issue 13633: there was a race where we returned bodyless responses
+// to callers before recycling the persistent connection, which meant
+// a client doing two subsequent requests could end up on different
+// connections. It's somewhat harmless but enough tests assume it's
+// not true in order to test other things that it's worth fixing.
+// Plus it's nice to be consistent and not have timing-dependent
+// behavior.
+func TestTransportReuseConnEmptyResponseBody(t *testing.T) {
+ defer afterTest(t)
+ cst := newClientServerTest(t, h1Mode, HandlerFunc(func(w ResponseWriter, r *Request) {
+ w.Header().Set("X-Addr", r.RemoteAddr)
+ // Empty response body.
+ }))
+ defer cst.close()
+ n := 100
+ if testing.Short() {
+ n = 10
+ }
+ var firstAddr string
+ for i := 0; i < n; i++ {
+ res, err := cst.c.Get(cst.ts.URL)
+ if err != nil {
+ log.Fatal(err)
+ }
+ addr := res.Header.Get("X-Addr")
+ if i == 0 {
+ firstAddr = addr
+ } else if addr != firstAddr {
+ t.Fatalf("On request %d, addr %q != original addr %q", i+1, addr, firstAddr)
+ }
+ res.Body.Close()
+ }
+}
+
func wantBody(res *Response, err error, want string) error {
if err != nil {
return err