From 16f846a9cbe747b13498761f1dd1a298478ec43e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 19 May 2016 17:35:23 +0000 Subject: [PATCH] net/http: update bundled http2 Updates x/net/http2 to git rev 202ff482 for https://golang.org/cl/23235 (Expect: 100-continue support for HTTP/2) Fixes a flaky test too, and changes the automatic HTTP/2 behavior to no longer special-case the DefaultTransport, because ExpectContinueTimeout is no longer unsupported by the HTTP/2 transport. Fixes #13851 Fixes #15744 Change-Id: I3522aace14179a1ca070fd7063368a831167a0f7 Reviewed-on: https://go-review.googlesource.com/23254 Reviewed-by: Andrew Gerrand --- src/net/http/h2_bundle.go | 168 +++++++++++++++++++++++++++------ src/net/http/transport.go | 15 +-- src/net/http/transport_test.go | 17 ++-- 3 files changed, 153 insertions(+), 47 deletions(-) diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 6f7fd382ea..633bdeadb7 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -1975,6 +1975,10 @@ func http2summarizeFrame(f http2Frame) string { return buf.String() } +func http2transportExpectContinueTimeout(t1 *Transport) time.Duration { + return t1.ExpectContinueTimeout +} + type http2contextContext interface { context.Context } @@ -2010,8 +2014,8 @@ func http2traceGotConn(req *Request, cc *http2ClientConn) { ci := httptrace.GotConnInfo{Conn: cc.tconn} cc.mu.Lock() ci.Reused = cc.nextStreamID > 1 - ci.WasIdle = len(cc.streams) == 0 - if ci.WasIdle { + ci.WasIdle = len(cc.streams) == 0 && ci.Reused + if ci.WasIdle && !cc.lastActive.IsZero() { ci.IdleTime = time.Now().Sub(cc.lastActive) } cc.mu.Unlock() @@ -2025,6 +2029,18 @@ func http2traceWroteHeaders(trace *http2clientTrace) { } } +func http2traceGot100Continue(trace *http2clientTrace) { + if trace != nil && trace.Got100Continue != nil { + trace.Got100Continue() + } +} + +func http2traceWait100Continue(trace *http2clientTrace) { + if trace != nil && trace.Wait100Continue != nil { + trace.Wait100Continue() + } +} + func http2traceWroteRequest(trace *http2clientTrace, err error) { if trace != nil && trace.WroteRequest != nil { trace.WroteRequest(httptrace.WroteRequestInfo{Err: err}) @@ -4906,6 +4922,7 @@ type http2clientStream struct { resc chan http2resAndError bufPipe http2pipe // buffered pipe with the flow-controlled response payload requestedGzip bool + on100 func() // optional code to run if get a 100 continue response flow http2flow // guarded by cc.mu inflow http2flow // guarded by cc.mu @@ -5114,6 +5131,13 @@ func (t *http2Transport) disableKeepAlives() bool { return t.t1 != nil && t.t1.DisableKeepAlives } +func (t *http2Transport) expectContinueTimeout() time.Duration { + if t.t1 == nil { + return 0 + } + return http2transportExpectContinueTimeout(t.t1) +} + func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { if http2VerboseLogs { t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr()) @@ -5311,6 +5335,30 @@ func http2checkConnHeaders(req *Request) error { return nil } +func http2bodyAndLength(req *Request) (body io.Reader, contentLen int64) { + body = req.Body + if body == nil { + return nil, 0 + } + if req.ContentLength != 0 { + return req.Body, req.ContentLength + } + + // We have a body but a zero content length. Test to see if + // it's actually zero or just unset. + var buf [1]byte + n, rerr := io.ReadFull(body, buf[:]) + if rerr != nil && rerr != io.EOF { + return http2errorReader{rerr}, -1 + } + if n == 1 { + + return io.MultiReader(bytes.NewReader(buf[:]), body), -1 + } + + return nil, 0 +} + func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if err := http2checkConnHeaders(req); err != nil { return nil, err @@ -5322,24 +5370,8 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { } hasTrailers := trailers != "" - var body io.Reader = req.Body - contentLen := req.ContentLength - if req.Body != nil && contentLen == 0 { - // Test to see if it's actually zero or just unset. - var buf [1]byte - n, rerr := io.ReadFull(body, buf[:]) - if rerr != nil && rerr != io.EOF { - contentLen = -1 - body = http2errorReader{rerr} - } else if n == 1 { - - contentLen = -1 - body = io.MultiReader(bytes.NewReader(buf[:]), body) - } else { - - body = nil - } - } + body, contentLen := http2bodyAndLength(req) + hasBody := body != nil cc.mu.Lock() cc.lastActive = time.Now() @@ -5367,8 +5399,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { cs := cc.newStream() cs.req = req cs.trace = http2requestTrace(req) - hasBody := body != nil cs.requestedGzip = requestedGzip + bodyWriter := cc.t.getBodyWriterState(cs, body) + cs.on100 = bodyWriter.on100 cc.wmu.Lock() endStream := !hasBody && !hasTrailers @@ -5380,6 +5413,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if werr != nil { if hasBody { req.Body.Close() + bodyWriter.cancel() } cc.forgetStreamID(cs.ID) @@ -5388,12 +5422,8 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { } var respHeaderTimer <-chan time.Time - var bodyCopyErrc chan error // result of body copy if hasBody { - bodyCopyErrc = make(chan error, 1) - go func() { - bodyCopyErrc <- cs.writeRequestBody(body, req.Body) - }() + bodyWriter.scheduleBodyWrite() } else { http2traceWroteRequest(cs.trace, nil) if d := cc.responseHeaderTimeout(); d != 0 { @@ -5413,6 +5443,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { res := re.res if re.err != nil || res.StatusCode > 299 { + bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } if re.err != nil { @@ -5427,6 +5458,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { + bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } return nil, http2errTimeout @@ -5435,6 +5467,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { + bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } return nil, ctx.Err() @@ -5443,14 +5476,14 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { + bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } return nil, http2errRequestCanceled case <-cs.peerReset: return nil, cs.resetErr - case err := <-bodyCopyErrc: - http2traceWroteRequest(cs.trace, err) + case err := <-bodyWriter.resc: if err != nil { return nil, err } @@ -5508,6 +5541,7 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos defer cc.putFrameScratchBuffer(buf) defer func() { + http2traceWroteRequest(cs.trace, err) cerr := bodyCloser.Close() if err == nil { @@ -5934,7 +5968,10 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http } if statusCode == 100 { - + http2traceGot100Continue(cs.trace) + if cs.on100 != nil { + cs.on100() + } cs.pastHeaders = false return nil, nil } @@ -6344,6 +6381,79 @@ type http2errorReader struct{ err error } func (r http2errorReader) Read(p []byte) (int, error) { return 0, r.err } +// bodyWriterState encapsulates various state around the Transport's writing +// of the request body, particularly regarding doing delayed writes of the body +// when the request contains "Expect: 100-continue". +type http2bodyWriterState struct { + cs *http2clientStream + timer *time.Timer // if non-nil, we're doing a delayed write + fnonce *sync.Once // to call fn with + fn func() // the code to run in the goroutine, writing the body + resc chan error // result of fn's execution + delay time.Duration // how long we should delay a delayed write for +} + +func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) { + s.cs = cs + if body == nil { + return + } + resc := make(chan error, 1) + s.resc = resc + s.fn = func() { + resc <- cs.writeRequestBody(body, cs.req.Body) + } + s.delay = t.expectContinueTimeout() + if s.delay == 0 || + !httplex.HeaderValuesContainsToken( + cs.req.Header["Expect"], + "100-continue") { + return + } + s.fnonce = new(sync.Once) + + // Arm the timer with a very large duration, which we'll + // intentionally lower later. It has to be large now because + // we need a handle to it before writing the headers, but the + // s.delay value is defined to not start until after the + // request headers were written. + const hugeDuration = 365 * 24 * time.Hour + s.timer = time.AfterFunc(hugeDuration, func() { + s.fnonce.Do(s.fn) + }) + return +} + +func (s http2bodyWriterState) cancel() { + if s.timer != nil { + s.timer.Stop() + } +} + +func (s http2bodyWriterState) on100() { + if s.timer == nil { + + return + } + s.timer.Stop() + go func() { s.fnonce.Do(s.fn) }() +} + +// scheduleBodyWrite starts writing the body, either immediately (in +// the common case) or after the delay timeout. It should not be +// called until after the headers have been written. +func (s http2bodyWriterState) scheduleBodyWrite() { + if s.timer == nil { + + go s.fn() + return + } + http2traceWait100Continue(s.cs.trace) + if s.timer.Stop() { + s.timer.Reset(s.delay) + } +} + // writeFramer is implemented by any type that is used to write frames. type http2writeFramer interface { writeFrame(http2writeContext) error diff --git a/src/net/http/transport.go b/src/net/http/transport.go index 777501f5bd..37fa7a0783 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -205,15 +205,6 @@ func (t *Transport) onceSetNextProtoDefaults() { // by modifying their tls.Config. Issue 14275. return } - if t.ExpectContinueTimeout != 0 && t != DefaultTransport { - // ExpectContinueTimeout is unsupported in http2, so - // if they explicitly asked for it (as opposed to just - // using the DefaultTransport, which sets it), then - // disable http2 for now. - // - // Issue 13851. (and changed in Issue 14391) - return - } t2, err := http2configureTransport(t) if err != nil { log.Printf("Error enabling Transport HTTP/2 support: %v", err) @@ -854,7 +845,7 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC select { case v := <-dialc: // Our dial finished. - if trace != nil && trace.GotConn != nil && v.pc != nil { + if trace != nil && trace.GotConn != nil && v.pc != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, v.err @@ -1243,7 +1234,9 @@ func (pc *persistConn) gotIdleConnTrace(idleAt time.Time) (t httptrace.GotConnIn t.Reused = pc.reused t.Conn = pc.conn t.WasIdle = true - t.IdleTime = time.Since(idleAt) + if !idleAt.IsZero() { + t.IdleTime = time.Since(idleAt) + } return } diff --git a/src/net/http/transport_test.go b/src/net/http/transport_test.go index ab05c31cb5..b80c151a24 100644 --- a/src/net/http/transport_test.go +++ b/src/net/http/transport_test.go @@ -2983,7 +2983,7 @@ func TestTransportAutomaticHTTP2_TLSConfig(t *testing.T) { func TestTransportAutomaticHTTP2_ExpectContinueTimeout(t *testing.T) { testTransportAutoHTTP(t, &Transport{ ExpectContinueTimeout: 1 * time.Second, - }, false) + }, true) } func TestTransportAutomaticHTTP2_Dial(t *testing.T) { @@ -3225,9 +3225,8 @@ func testTransportEventTrace(t *testing.T, h2 bool, noHooks bool) { io.WriteString(w, resBody) })) defer cst.close() - if !h2 { - cst.tr.ExpectContinueTimeout = 1 * time.Second - } + + cst.tr.ExpectContinueTimeout = 1 * time.Second var mu sync.Mutex var buf bytes.Buffer @@ -3283,10 +3282,12 @@ func testTransportEventTrace(t *testing.T, h2 bool, noHooks bool) { if err != nil { t.Fatal(err) } + logf("got roundtrip.response") slurp, err := ioutil.ReadAll(res.Body) if err != nil { t.Fatal(err) } + logf("consumed body") if string(slurp) != resBody || res.StatusCode != 200 { t.Fatalf("Got %q, %v; want %q, 200 OK", slurp, res.Status, resBody) } @@ -3305,6 +3306,9 @@ func testTransportEventTrace(t *testing.T, h2 bool, noHooks bool) { t.Errorf("expected substring %q in output.", sub) } } + if strings.Count(got, "got conn: {") != 1 { + t.Errorf("expected exactly 1 \"got conn\" event.") + } wantSub("Getting conn for dns-is-faked.golang:" + port) wantSub("DNS start: {Host:dns-is-faked.golang}") wantSub("DNS done: {Addrs:[{IP:" + ip + " Zone:}] Err: Coalesced:false}") @@ -3314,10 +3318,9 @@ func testTransportEventTrace(t *testing.T, h2 bool, noHooks bool) { wantSub("first response byte") if !h2 { wantSub("PutIdleConn = ") - // TODO: implement these next two for Issue 13851 - wantSub("Wait100Continue") - wantSub("Got100Continue") } + wantSub("Wait100Continue") + wantSub("Got100Continue") wantSub("WroteRequest: {Err:}") if strings.Contains(got, " to udp ") { t.Errorf("should not see UDP (DNS) connections") -- 2.50.0