]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: update bundled http2; fixes TestTransportAndServerSharedBodyRace_h2
authorBrad Fitzpatrick <bradfitz@golang.org>
Thu, 7 Jan 2016 00:51:13 +0000 (00:51 +0000)
committerBrad Fitzpatrick <bradfitz@golang.org>
Thu, 7 Jan 2016 01:49:37 +0000 (01:49 +0000)
Update bundled http2 to git rev d1ba260648 (https://golang.org/cl/18288).

Fixes the flaky TestTransportAndServerSharedBodyRace_h2.

Also adds some debugging to TestTransportAndServerSharedBodyRace_h2
which I hope won't ever be necessary again, but I know will be.

Fixes #13556

Change-Id: Ibcf2fc23ec0122dcac8891fdc3bd7f8acddd880e
Reviewed-on: https://go-review.googlesource.com/18289
Reviewed-by: Andrew Gerrand <adg@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/net/http/h2_bundle.go
src/net/http/serve_test.go

index c7bf2ab84d80051854e05bdd8c81078b68f21419..030ca207295cf5beb2e10b666f45b14533f677bb 100644 (file)
@@ -4219,6 +4219,8 @@ type http2clientStream struct {
        peerReset chan struct{} // closed on peer reset
        resetErr  error         // populated before peerReset is closed
 
+       done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
+
        // owned by clientConnReadLoop:
        headersDone  bool // got HEADERS w/ END_HEADERS
        trailersDone bool // got second HEADERS frame w/ END_HEADERS
@@ -4227,7 +4229,11 @@ type http2clientStream struct {
        resTrailer Header // client's Response.Trailer
 }
 
-// awaitRequestCancel runs in its own goroutine and waits for the user's
+// awaitRequestCancel runs in its own goroutine and waits for the user
+// to either cancel a RoundTrip request (using the provided
+// Request.Cancel channel), or for the request to be done (any way it
+// might be removed from the cc.streams map: peer reset, successful
+// completion, TCP connection breakage, etc)
 func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) {
        if cancel == nil {
                return
@@ -4235,7 +4241,8 @@ func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) {
        select {
        case <-cancel:
                cs.bufPipe.CloseWithError(http2errRequestCanceled)
-       case <-cs.bufPipe.Done():
+               cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+       case <-cs.done:
        }
 }
 
@@ -4594,6 +4601,11 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
        cc.mu.Unlock()
 
        if werr != nil {
+               if hasBody {
+                       req.Body.Close()
+               }
+               cc.forgetStreamID(cs.ID)
+
                return nil, werr
        }
 
@@ -4605,26 +4617,47 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
                }()
        }
 
+       readLoopResCh := cs.resc
+       requestCanceledCh := http2requestCancel(req)
+       requestCanceled := false
        for {
                select {
-               case re := <-cs.resc:
+               case re := <-readLoopResCh:
                        res := re.res
                        if re.err != nil || res.StatusCode > 299 {
 
                                cs.abortRequestBodyWrite()
                        }
                        if re.err != nil {
+                               cc.forgetStreamID(cs.ID)
                                return nil, re.err
                        }
                        res.Request = req
                        res.TLS = cc.tlsState
                        return res, nil
-               case <-http2requestCancel(req):
+               case <-requestCanceledCh:
+                       cc.forgetStreamID(cs.ID)
                        cs.abortRequestBodyWrite()
-                       return nil, http2errRequestCanceled
+                       if !hasBody {
+                               cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+                               return nil, http2errRequestCanceled
+                       }
+
+                       requestCanceled = true
+                       requestCanceledCh = nil
+                       readLoopResCh = nil
                case <-cs.peerReset:
+                       if requestCanceled {
+
+                               return nil, http2errRequestCanceled
+                       }
+
                        return nil, cs.resetErr
                case err := <-bodyCopyErrc:
+                       if requestCanceled {
+                               cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+                               return nil, http2errRequestCanceled
+                       }
                        if err != nil {
                                return nil, err
                        }
@@ -4856,6 +4889,7 @@ func (cc *http2ClientConn) newStream() *http2clientStream {
                ID:        cc.nextStreamID,
                resc:      make(chan http2resAndError, 1),
                peerReset: make(chan struct{}),
+               done:      make(chan struct{}),
        }
        cs.flow.add(int32(cc.initialWindowSize))
        cs.flow.setConnFlow(&cc.flow)
@@ -4866,12 +4900,17 @@ func (cc *http2ClientConn) newStream() *http2clientStream {
        return cs
 }
 
+func (cc *http2ClientConn) forgetStreamID(id uint32) {
+       cc.streamByID(id, true)
+}
+
 func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream {
        cc.mu.Lock()
        defer cc.mu.Unlock()
        cs := cc.streams[id]
-       if andRemove {
+       if andRemove && cs != nil {
                delete(cc.streams, id)
+               close(cs.done)
        }
        return cs
 }
@@ -4926,6 +4965,7 @@ func (rl *http2clientConnReadLoop) cleanup() {
                case cs.resc <- http2resAndError{err: err}:
                default:
                }
+               close(cs.done)
        }
        cc.closed = true
        cc.cond.Broadcast()
@@ -5291,6 +5331,7 @@ func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode,
 
        cc.wmu.Lock()
        cc.fr.WriteRSTStream(streamID, code)
+       cc.bw.Flush()
        cc.wmu.Unlock()
 }
 
index cbe85d255bb4d25f04d4e9525f82a5949de00c3a..4a006fb369bf512fc73c5030509f3c0aa0000054 100644 (file)
@@ -27,6 +27,7 @@ import (
        "os/exec"
        "reflect"
        "runtime"
+       "runtime/debug"
        "sort"
        "strconv"
        "strings"
@@ -3038,7 +3039,6 @@ func TestTransportAndServerSharedBodyRace_h1(t *testing.T) {
        testTransportAndServerSharedBodyRace(t, h1Mode)
 }
 func TestTransportAndServerSharedBodyRace_h2(t *testing.T) {
-       t.Skip("failing in http2 mode; golang.org/issue/13556")
        testTransportAndServerSharedBodyRace(t, h2Mode)
 }
 func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
@@ -3046,11 +3046,40 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
 
        const bodySize = 1 << 20
 
+       // errorf is like t.Errorf, but also writes to println.  When
+       // this test fails, it hangs. This helps debugging and I've
+       // added this enough times "temporarily".  It now gets added
+       // full time.
+       errorf := func(format string, args ...interface{}) {
+               v := fmt.Sprintf(format, args...)
+               println(v)
+               t.Error(v)
+       }
+
        unblockBackend := make(chan bool)
        backend := newClientServerTest(t, h2, HandlerFunc(func(rw ResponseWriter, req *Request) {
-               io.CopyN(rw, req.Body, bodySize)
+               gone := rw.(CloseNotifier).CloseNotify()
+               didCopy := make(chan interface{})
+               go func() {
+                       n, err := io.CopyN(rw, req.Body, bodySize)
+                       didCopy <- []interface{}{n, err}
+               }()
+               isGone := false
+       Loop:
+               for {
+                       select {
+                       case <-didCopy:
+                               break Loop
+                       case <-gone:
+                               isGone = true
+                       case <-time.After(time.Second):
+                               println("1 second passes in backend, proxygone=", isGone)
+                       }
+               }
                <-unblockBackend
        }))
+       var quitTimer *time.Timer
+       defer func() { quitTimer.Stop() }()
        defer backend.close()
 
        backendRespc := make(chan *Response, 1)
@@ -3063,17 +3092,17 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
 
                bresp, err := proxy.c.Do(req2)
                if err != nil {
-                       t.Errorf("Proxy outbound request: %v", err)
+                       errorf("Proxy outbound request: %v", err)
                        return
                }
                _, err = io.CopyN(ioutil.Discard, bresp.Body, bodySize/2)
                if err != nil {
-                       t.Errorf("Proxy copy error: %v", err)
+                       errorf("Proxy copy error: %v", err)
                        return
                }
                backendRespc <- bresp // to close later
 
-               // Try to cause a race: Both the DefaultTransport and the proxy handler's Server
+               // Try to cause a race: Both the Transport and the proxy handler's Server
                // will try to read/close req.Body (aka req2.Body)
                if h2 {
                        close(cancel)
@@ -3083,6 +3112,20 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
                rw.Write([]byte("OK"))
        }))
        defer proxy.close()
+       defer func() {
+               // Before we shut down our two httptest.Servers, start a timer.
+               // We choose 7 seconds because httptest.Server starts logging
+               // warnings to stderr at 5 seconds. If we don't disarm this bomb
+               // in 7 seconds (after the two httptest.Server.Close calls above),
+               // then we explode with stacks.
+               quitTimer = time.AfterFunc(7*time.Second, func() {
+                       debug.SetTraceback("ALL")
+                       stacks := make([]byte, 1<<20)
+                       stacks = stacks[:runtime.Stack(stacks, true)]
+                       fmt.Fprintf(os.Stderr, "%s", stacks)
+                       log.Fatalf("Timeout.")
+               })
+       }()
 
        defer close(unblockBackend)
        req, _ := NewRequest("POST", proxy.ts.URL, io.LimitReader(neverEnding('a'), bodySize))