]> Cypherpunks repositories - gostls13.git/commitdiff
http: fix Transport connection re-use race
authorBrad Fitzpatrick <bradfitz@golang.org>
Tue, 5 Apr 2011 02:22:47 +0000 (19:22 -0700)
committerBrad Fitzpatrick <bradfitz@golang.org>
Tue, 5 Apr 2011 02:22:47 +0000 (19:22 -0700)
A connection shouldn't be made available
for re-use until its body has been consumed.

(except in the case of pipelining, which isn't
implemented yet)

This CL fixes some issues seen with heavy load
against Amazon S3.

Subtle implementation detail: to prevent a race
with the client requesting a new connection
before previous one is returned, we actually
have to call putIdleConnection _before_ we
return from the final Read/Close call on the
http.Response.Body.

R=rsc, adg
CC=golang-dev
https://golang.org/cl/4351048

src/pkg/http/transport.go
src/pkg/http/transport_test.go

index fa4120e27af659ce50f4fe939de33176729538ca..797d134aa8511b645783d4c62807fb14e4f509d1 100644 (file)
@@ -424,25 +424,37 @@ func (pc *persistConn) readLoop() {
 
                rc := <-pc.reqch
                resp, err := pc.cc.Read(rc.req)
-               if err == nil && !rc.req.Close {
-                       pc.t.putIdleConn(pc)
-               }
+
                if err == ErrPersistEOF {
                        // Succeeded, but we can't send any more
                        // persistent connections on this again.  We
                        // hide this error to upstream callers.
                        alive = false
                        err = nil
-               } else if err != nil {
+               } else if err != nil || rc.req.Close {
                        alive = false
                }
+
                hasBody := resp != nil && resp.ContentLength != 0
+               var waitForBodyRead chan bool
+               if alive {
+                       if hasBody {
+                               waitForBodyRead = make(chan bool)
+                               resp.Body.(*bodyEOFSignal).fn = func() {
+                                       pc.t.putIdleConn(pc)
+                                       waitForBodyRead <- true
+                               }
+                       } else {
+                               pc.t.putIdleConn(pc)
+                       }
+               }
+
                rc.ch <- responseAndError{resp, err}
 
                // Wait for the just-returned response body to be fully consumed
                // before we race and peek on the underlying bufio reader.
-               if alive && hasBody {
-                       <-resp.Body.(*bodyEOFSignal).ch
+               if waitForBodyRead != nil {
+                       <-waitForBodyRead
                }
        }
 }
@@ -514,33 +526,33 @@ func responseIsKeepAlive(res *Response) bool {
 func readResponseWithEOFSignal(r *bufio.Reader, requestMethod string) (resp *Response, err os.Error) {
        resp, err = ReadResponse(r, requestMethod)
        if err == nil && resp.ContentLength != 0 {
-               resp.Body = &bodyEOFSignal{resp.Body, make(chan bool, 1), false}
+               resp.Body = &bodyEOFSignal{resp.Body, nil}
        }
        return
 }
 
-// bodyEOFSignal wraps a ReadCloser but sends on ch once once
-// the wrapped ReadCloser is fully consumed (including on Close)
+// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
+// once, right before the final Read() or Close() call returns, but after
+// EOF has been seen.
 type bodyEOFSignal struct {
        body io.ReadCloser
-       ch   chan bool
-       done bool
+       fn   func()
 }
 
 func (es *bodyEOFSignal) Read(p []byte) (n int, err os.Error) {
        n, err = es.body.Read(p)
-       if err == os.EOF && !es.done {
-               es.ch <- true
-               es.done = true
+       if err == os.EOF && es.fn != nil {
+               es.fn()
+               es.fn = nil
        }
        return
 }
 
 func (es *bodyEOFSignal) Close() (err os.Error) {
        err = es.body.Close()
-       if err == nil && !es.done {
-               es.ch <- true
-               es.done = true
+       if err == nil && es.fn != nil {
+               es.fn()
+               es.fn = nil
        }
        return
 }
index 6a5438d9c6a196d9776fec879a1536806baf082a..8a77a4854978e9c0d3e5a2f19c680fd89fe71fee 100644 (file)
@@ -85,6 +85,7 @@ func TestTransportConnectionCloseOnResponse(t *testing.T) {
                                t.Fatalf("error in connectionClose=%v, req #%d, Do: %v", connectionClose, n, err)
                        }
                        body, err := ioutil.ReadAll(res.Body)
+                       defer res.Body.Close()
                        if err != nil {
                                t.Fatalf("error in connectionClose=%v, req #%d, ReadAll: %v", connectionClose, n, err)
                        }
@@ -154,9 +155,11 @@ func TestTransportIdleCacheKeys(t *testing.T) {
                t.Errorf("After CloseIdleConnections expected %d idle conn cache keys; got %d", e, g)
        }
 
-       if _, _, err := c.Get(ts.URL); err != nil {
+       resp, _, err := c.Get(ts.URL)
+       if err != nil {
                t.Error(err)
        }
+       ioutil.ReadAll(resp.Body)
 
        keys := tr.IdleConnKeysForTesting()
        if e, g := 1, len(keys); e != g {
@@ -187,7 +190,11 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) {
        // ch)
        donech := make(chan bool)
        doReq := func() {
-               c.Get(ts.URL)
+               resp, _, err := c.Get(ts.URL)
+               if err != nil {
+                       t.Error(err)
+               }
+               ioutil.ReadAll(resp.Body)
                donech <- true
        }
        go doReq()