]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: update bundled http2
authorBrad Fitzpatrick <bradfitz@golang.org>
Tue, 26 Jul 2016 22:17:38 +0000 (00:17 +0200)
committerAndrew Gerrand <adg@golang.org>
Tue, 26 Jul 2016 23:04:15 +0000 (23:04 +0000)
Updates x/net/http2 to git rev 6a513af for:

  http2: return flow control for closed streams
  https://golang.org/cl/25231

  http2: make Transport prefer HTTP response header recv before body write error
  https://golang.org/cl/24984

  http2: make Transport treat "Connection: close" the same as Request.Close
  https://golang.org/cl/24982

Fixes golang/go#16481

Change-Id: Iaddb166387ca2df1cfbbf09a166f8605578bec49
Reviewed-on: https://go-review.googlesource.com/25282
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Andrew Gerrand <adg@golang.org>
src/net/http/h2_bundle.go

index 47e5f577e691e4ccd19e479b23c177b14ac2da6d..a117897bcf42f866b5621f718f42e6573c7fbe66 100644 (file)
@@ -85,7 +85,16 @@ const (
        http2noDialOnMiss = false
 )
 
-func (p *http2clientConnPool) getClientConn(_ *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) {
+func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) {
+       if http2isConnectionCloseRequest(req) && dialOnMiss {
+               // It gets its own connection.
+               const singleUse = true
+               cc, err := p.t.dialClientConn(addr, singleUse)
+               if err != nil {
+                       return nil, err
+               }
+               return cc, nil
+       }
        p.mu.Lock()
        for _, cc := range p.conns[addr] {
                if cc.CanTakeNewRequest() {
@@ -128,7 +137,8 @@ func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall {
 
 // run in its own goroutine.
 func (c *http2dialCall) dial(addr string) {
-       c.res, c.err = c.p.t.dialClientConn(addr)
+       const singleUse = false // shared conn
+       c.res, c.err = c.p.t.dialClientConn(addr, singleUse)
        close(c.done)
 
        c.p.mu.Lock()
@@ -3803,6 +3813,9 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) {
        }
        delete(sc.streams, st.id)
        if p := st.body; p != nil {
+
+               sc.sendWindowUpdate(nil, p.Len())
+
                p.CloseWithError(err)
        }
        st.cw.Close()
@@ -3879,17 +3892,24 @@ func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error {
 
 func (sc *http2serverConn) processData(f *http2DataFrame) error {
        sc.serveG.check()
+       data := f.Data()
 
        id := f.Header().StreamID
        st, ok := sc.streams[id]
        if !ok || st.state != http2stateOpen || st.gotTrailerHeader {
 
+               if int(sc.inflow.available()) < len(data) {
+                       return http2StreamError{id, http2ErrCodeFlowControl}
+               }
+
+               sc.inflow.take(int32(len(data)))
+               sc.sendWindowUpdate(nil, len(data))
+
                return http2StreamError{id, http2ErrCodeStreamClosed}
        }
        if st.body == nil {
                panic("internal error: should have a body in this state")
        }
-       data := f.Data()
 
        if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
                st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
@@ -4919,9 +4939,10 @@ func (t *http2Transport) initConnPool() {
 // ClientConn is the state of a single HTTP/2 client connection to an
 // HTTP/2 server.
 type http2ClientConn struct {
-       t        *http2Transport
-       tconn    net.Conn             // usually *tls.Conn, except specialized impls
-       tlsState *tls.ConnectionState // nil only for specialized impls
+       t         *http2Transport
+       tconn     net.Conn             // usually *tls.Conn, except specialized impls
+       tlsState  *tls.ConnectionState // nil only for specialized impls
+       singleUse bool                 // whether being used for a single http.Request
 
        // readLoop goroutine fields:
        readerDone chan struct{} // closed on error
@@ -5117,7 +5138,7 @@ func http2shouldRetryRequest(req *Request, err error) bool {
        return err == http2errClientConnUnusable
 }
 
-func (t *http2Transport) dialClientConn(addr string) (*http2ClientConn, error) {
+func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
        host, _, err := net.SplitHostPort(addr)
        if err != nil {
                return nil, err
@@ -5126,7 +5147,7 @@ func (t *http2Transport) dialClientConn(addr string) (*http2ClientConn, error) {
        if err != nil {
                return nil, err
        }
-       return t.NewClientConn(tconn)
+       return t.newClientConn(tconn, singleUse)
 }
 
 func (t *http2Transport) newTLSConfig(host string) *tls.Config {
@@ -5187,6 +5208,10 @@ func (t *http2Transport) expectContinueTimeout() time.Duration {
 }
 
 func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
+       return t.newClientConn(c, false)
+}
+
+func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) {
        if http2VerboseLogs {
                t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr())
        }
@@ -5204,6 +5229,7 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
                initialWindowSize:    65535,
                maxConcurrentStreams: 1000,
                streams:              make(map[uint32]*http2clientStream),
+               singleUse:            singleUse,
        }
        cc.cond = sync.NewCond(&cc.mu)
        cc.flow.add(int32(http2initialWindowSize))
@@ -5288,6 +5314,9 @@ func (cc *http2ClientConn) CanTakeNewRequest() bool {
 }
 
 func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
+       if cc.singleUse && cc.nextStreamID > 1 {
+               return false
+       }
        return cc.goAway == nil && !cc.closed &&
                int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
                cc.nextStreamID < 2147483647
@@ -5494,22 +5523,26 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
        bodyWritten := false
        ctx := http2reqContext(req)
 
+       handleReadLoopResponse := func(re http2resAndError) (*Response, error) {
+               res := re.res
+               if re.err != nil || res.StatusCode > 299 {
+
+                       bodyWriter.cancel()
+                       cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
+               }
+               if re.err != nil {
+                       cc.forgetStreamID(cs.ID)
+                       return nil, re.err
+               }
+               res.Request = req
+               res.TLS = cc.tlsState
+               return res, nil
+       }
+
        for {
                select {
                case re := <-readLoopResCh:
-                       res := re.res
-                       if re.err != nil || res.StatusCode > 299 {
-
-                               bodyWriter.cancel()
-                               cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
-                       }
-                       if re.err != nil {
-                               cc.forgetStreamID(cs.ID)
-                               return nil, re.err
-                       }
-                       res.Request = req
-                       res.TLS = cc.tlsState
-                       return res, nil
+                       return handleReadLoopResponse(re)
                case <-respHeaderTimer:
                        cc.forgetStreamID(cs.ID)
                        if !hasBody || bodyWritten {
@@ -5541,6 +5574,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
 
                        return nil, cs.resetErr
                case err := <-bodyWriter.resc:
+
+                       select {
+                       case re := <-readLoopResCh:
+                               return handleReadLoopResponse(re)
+                       default:
+                       }
                        if err != nil {
                                return nil, err
                        }
@@ -5932,7 +5971,7 @@ func (rl *http2clientConnReadLoop) cleanup() {
 
 func (rl *http2clientConnReadLoop) run() error {
        cc := rl.cc
-       rl.closeWhenIdle = cc.t.disableKeepAlives()
+       rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
        gotReply := false
        for {
                f, err := cc.fr.ReadFrame()
@@ -6216,10 +6255,27 @@ var http2errClosedResponseBody = errors.New("http2: response body closed")
 
 func (b http2transportResponseBody) Close() error {
        cs := b.cs
-       if cs.bufPipe.Err() != io.EOF {
+       cc := cs.cc
 
-               cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+       serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
+       unread := cs.bufPipe.Len()
+
+       if unread > 0 || !serverSentStreamEnd {
+               cc.mu.Lock()
+               cc.wmu.Lock()
+               if !serverSentStreamEnd {
+                       cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
+               }
+
+               if unread > 0 {
+                       cc.inflow.add(int32(unread))
+                       cc.fr.WriteWindowUpdate(0, uint32(unread))
+               }
+               cc.bw.Flush()
+               cc.wmu.Unlock()
+               cc.mu.Unlock()
        }
+
        cs.bufPipe.BreakWithError(http2errClosedResponseBody)
        return nil
 }
@@ -6227,6 +6283,7 @@ func (b http2transportResponseBody) Close() error {
 func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
        cc := rl.cc
        cs := cc.streamByID(f.StreamID, f.StreamEnded())
+       data := f.Data()
        if cs == nil {
                cc.mu.Lock()
                neverSent := cc.nextStreamID
@@ -6237,9 +6294,15 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
                        return http2ConnectionError(http2ErrCodeProtocol)
                }
 
+               if len(data) > 0 {
+                       cc.wmu.Lock()
+                       cc.fr.WriteWindowUpdate(0, uint32(len(data)))
+                       cc.bw.Flush()
+                       cc.wmu.Unlock()
+               }
                return nil
        }
-       if data := f.Data(); len(data) > 0 {
+       if len(data) > 0 {
                if cs.bufPipe.b == nil {
 
                        cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
@@ -6282,7 +6345,7 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err
        }
        cs.bufPipe.closeWithErrorAndCode(err, code)
        delete(rl.activeRes, cs.ID)
-       if cs.req.Close || cs.req.Header.Get("Connection") == "close" {
+       if http2isConnectionCloseRequest(cs.req) {
                rl.closeWhenIdle = true
        }
 }
@@ -6538,6 +6601,12 @@ func (s http2bodyWriterState) scheduleBodyWrite() {
        }
 }
 
+// isConnectionCloseRequest reports whether req should use its own
+// connection for a single request and then close the connection.
+func http2isConnectionCloseRequest(req *Request) bool {
+       return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close")
+}
+
 // writeFramer is implemented by any type that is used to write frames.
 type http2writeFramer interface {
        writeFrame(http2writeContext) error