peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
+ done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
+
// owned by clientConnReadLoop:
headersDone bool // got HEADERS w/ END_HEADERS
trailersDone bool // got second HEADERS frame w/ END_HEADERS
resTrailer Header // client's Response.Trailer
}
-// awaitRequestCancel runs in its own goroutine and waits for the user's
+// awaitRequestCancel runs in its own goroutine and waits for the user
+// to either cancel a RoundTrip request (using the provided
+// Request.Cancel channel), or for the request to be done (any way it
+// might be removed from the cc.streams map: peer reset, successful
+// completion, TCP connection breakage, etc)
func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) {
if cancel == nil {
return
select {
case <-cancel:
cs.bufPipe.CloseWithError(http2errRequestCanceled)
- case <-cs.bufPipe.Done():
+ cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ case <-cs.done:
}
}
cc.mu.Unlock()
if werr != nil {
+ if hasBody {
+ req.Body.Close()
+ }
+ cc.forgetStreamID(cs.ID)
+
return nil, werr
}
}()
}
+ readLoopResCh := cs.resc
+ requestCanceledCh := http2requestCancel(req)
+ requestCanceled := false
for {
select {
- case re := <-cs.resc:
+ case re := <-readLoopResCh:
res := re.res
if re.err != nil || res.StatusCode > 299 {
cs.abortRequestBodyWrite()
}
if re.err != nil {
+ cc.forgetStreamID(cs.ID)
return nil, re.err
}
res.Request = req
res.TLS = cc.tlsState
return res, nil
- case <-http2requestCancel(req):
+ case <-requestCanceledCh:
+ cc.forgetStreamID(cs.ID)
cs.abortRequestBodyWrite()
- return nil, http2errRequestCanceled
+ if !hasBody {
+ cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ return nil, http2errRequestCanceled
+ }
+
+ requestCanceled = true
+ requestCanceledCh = nil
+ readLoopResCh = nil
case <-cs.peerReset:
+ if requestCanceled {
+
+ return nil, http2errRequestCanceled
+ }
+
return nil, cs.resetErr
case err := <-bodyCopyErrc:
+ if requestCanceled {
+ cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ return nil, http2errRequestCanceled
+ }
if err != nil {
return nil, err
}
ID: cc.nextStreamID,
resc: make(chan http2resAndError, 1),
peerReset: make(chan struct{}),
+ done: make(chan struct{}),
}
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
return cs
}
+func (cc *http2ClientConn) forgetStreamID(id uint32) {
+ cc.streamByID(id, true)
+}
+
func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream {
cc.mu.Lock()
defer cc.mu.Unlock()
cs := cc.streams[id]
- if andRemove {
+ if andRemove && cs != nil {
delete(cc.streams, id)
+ close(cs.done)
}
return cs
}
case cs.resc <- http2resAndError{err: err}:
default:
}
+ close(cs.done)
}
cc.closed = true
cc.cond.Broadcast()
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
+ cc.bw.Flush()
cc.wmu.Unlock()
}
"os/exec"
"reflect"
"runtime"
+ "runtime/debug"
"sort"
"strconv"
"strings"
testTransportAndServerSharedBodyRace(t, h1Mode)
}
func TestTransportAndServerSharedBodyRace_h2(t *testing.T) {
- t.Skip("failing in http2 mode; golang.org/issue/13556")
testTransportAndServerSharedBodyRace(t, h2Mode)
}
func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
const bodySize = 1 << 20
+ // errorf is like t.Errorf, but also writes to println. When
+ // this test fails, it hangs. This helps debugging and I've
+ // added this enough times "temporarily". It now gets added
+ // full time.
+ errorf := func(format string, args ...interface{}) {
+ v := fmt.Sprintf(format, args...)
+ println(v)
+ t.Error(v)
+ }
+
unblockBackend := make(chan bool)
backend := newClientServerTest(t, h2, HandlerFunc(func(rw ResponseWriter, req *Request) {
- io.CopyN(rw, req.Body, bodySize)
+ gone := rw.(CloseNotifier).CloseNotify()
+ didCopy := make(chan interface{})
+ go func() {
+ n, err := io.CopyN(rw, req.Body, bodySize)
+ didCopy <- []interface{}{n, err}
+ }()
+ isGone := false
+ Loop:
+ for {
+ select {
+ case <-didCopy:
+ break Loop
+ case <-gone:
+ isGone = true
+ case <-time.After(time.Second):
+ println("1 second passes in backend, proxygone=", isGone)
+ }
+ }
<-unblockBackend
}))
+ var quitTimer *time.Timer
+ defer func() { quitTimer.Stop() }()
defer backend.close()
backendRespc := make(chan *Response, 1)
bresp, err := proxy.c.Do(req2)
if err != nil {
- t.Errorf("Proxy outbound request: %v", err)
+ errorf("Proxy outbound request: %v", err)
return
}
_, err = io.CopyN(ioutil.Discard, bresp.Body, bodySize/2)
if err != nil {
- t.Errorf("Proxy copy error: %v", err)
+ errorf("Proxy copy error: %v", err)
return
}
backendRespc <- bresp // to close later
- // Try to cause a race: Both the DefaultTransport and the proxy handler's Server
+ // Try to cause a race: Both the Transport and the proxy handler's Server
// will try to read/close req.Body (aka req2.Body)
if h2 {
close(cancel)
rw.Write([]byte("OK"))
}))
defer proxy.close()
+ defer func() {
+ // Before we shut down our two httptest.Servers, start a timer.
+ // We choose 7 seconds because httptest.Server starts logging
+ // warnings to stderr at 5 seconds. If we don't disarm this bomb
+ // in 7 seconds (after the two httptest.Server.Close calls above),
+ // then we explode with stacks.
+ quitTimer = time.AfterFunc(7*time.Second, func() {
+ debug.SetTraceback("ALL")
+ stacks := make([]byte, 1<<20)
+ stacks = stacks[:runtime.Stack(stacks, true)]
+ fmt.Fprintf(os.Stderr, "%s", stacks)
+ log.Fatalf("Timeout.")
+ })
+ }()
defer close(unblockBackend)
req, _ := NewRequest("POST", proxy.ts.URL, io.LimitReader(neverEnding('a'), bodySize))