]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: update bundled http2
authorTom Bergan <tombergan@google.com>
Wed, 9 Aug 2017 01:01:08 +0000 (18:01 -0700)
committerTom Bergan <tombergan@google.com>
Wed, 9 Aug 2017 05:33:09 +0000 (05:33 +0000)
Updates http2 to x/net/http2 git rev 1c05540f687 for:

  http2: fix format argument warnings in tests
  https://golang.org/cl/48090

  http2: retry requests after receiving REFUSED STREAM
  https://golang.org/cl/50471

  http2: block RoundTrip when the Transport hits MaxConcurrentStreams
  https://golang.org/cl/53250

Fixes #13774
Fixes #20985
Fixes #21229

Change-Id: Ie19b4a7cc395a0b7a25fac55f5051faaf94920bb
Reviewed-on: https://go-review.googlesource.com/54052
Run-TryBot: Tom Bergan <tombergan@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
src/net/http/h2_bundle.go

index 83f1671a5d08d6a4664494de5b7b34cd73f14c2b..6c1077d6786db1bd4cdc05a5dfeb4933fc0d30f0 100644 (file)
@@ -30,6 +30,7 @@ import (
        "io/ioutil"
        "log"
        "math"
+       mathrand "math/rand"
        "net"
        "net/http/httptrace"
        "net/textproto"
@@ -6683,6 +6684,7 @@ type http2ClientConn struct {
        goAwayDebug     string                        // goAway frame's debug data, retained as a string
        streams         map[uint32]*http2clientStream // client-initiated
        nextStreamID    uint32
+       pendingRequests int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
        pings           map[[8]byte]chan struct{} // in flight ping data to notification channel
        bw              *bufio.Writer
        br              *bufio.Reader
@@ -6735,35 +6737,45 @@ type http2clientStream struct {
        resTrailer *Header // client's Response.Trailer
 }
 
-// awaitRequestCancel runs in its own goroutine and waits for the user
-// to cancel a RoundTrip request, its context to expire, or for the
-// request to be done (any way it might be removed from the cc.streams
-// map: peer reset, successful completion, TCP connection breakage,
-// etc)
-func (cs *http2clientStream) awaitRequestCancel(req *Request) {
+// awaitRequestCancel waits for the user to cancel a request or for the done
+// channel to be signaled. A non-nil error is returned only if the request was
+// canceled.
+func http2awaitRequestCancel(req *Request, done <-chan struct{}) error {
        ctx := http2reqContext(req)
        if req.Cancel == nil && ctx.Done() == nil {
-               return
+               return nil
        }
        select {
        case <-req.Cancel:
-               cs.cancelStream()
-               cs.bufPipe.CloseWithError(http2errRequestCanceled)
+               return http2errRequestCanceled
        case <-ctx.Done():
+               return ctx.Err()
+       case <-done:
+               return nil
+       }
+}
+
+// awaitRequestCancel waits for the user to cancel a request, its context to
+// expire, or for the request to be done (any way it might be removed from the
+// cc.streams map: peer reset, successful completion, TCP connection breakage,
+// etc). If the request is canceled, then cs will be canceled and closed.
+func (cs *http2clientStream) awaitRequestCancel(req *Request) {
+       if err := http2awaitRequestCancel(req, cs.done); err != nil {
                cs.cancelStream()
-               cs.bufPipe.CloseWithError(ctx.Err())
-       case <-cs.done:
+               cs.bufPipe.CloseWithError(err)
        }
 }
 
 func (cs *http2clientStream) cancelStream() {
-       cs.cc.mu.Lock()
+       cc := cs.cc
+       cc.mu.Lock()
        didReset := cs.didReset
        cs.didReset = true
-       cs.cc.mu.Unlock()
+       cc.mu.Unlock()
 
        if !didReset {
-               cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+               cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+               cc.forgetStreamID(cs.ID)
        }
 }
 
@@ -6848,7 +6860,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
        }
 
        addr := http2authorityAddr(req.URL.Scheme, req.URL.Host)
-       for {
+       for retry := 0; ; retry++ {
                cc, err := t.connPool().GetClientConn(req, addr)
                if err != nil {
                        t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
@@ -6856,9 +6868,25 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
                }
                http2traceGotConn(req, cc)
                res, err := cc.RoundTrip(req)
-               if err != nil {
-                       if req, err = http2shouldRetryRequest(req, err); err == nil {
-                               continue
+               if err != nil && retry <= 6 {
+                       afterBodyWrite := false
+                       if e, ok := err.(http2afterReqBodyWriteError); ok {
+                               err = e
+                               afterBodyWrite = true
+                       }
+                       if req, err = http2shouldRetryRequest(req, err, afterBodyWrite); err == nil {
+                               // After the first retry, do exponential backoff with 10% jitter.
+                               if retry == 0 {
+                                       continue
+                               }
+                               backoff := float64(uint(1) << (uint(retry) - 1))
+                               backoff += backoff * (0.1 * mathrand.Float64())
+                               select {
+                               case <-time.After(time.Second * time.Duration(backoff)):
+                                       continue
+                               case <-http2reqContext(req).Done():
+                                       return nil, http2reqContext(req).Err()
+                               }
                        }
                }
                if err != nil {
@@ -6879,43 +6907,60 @@ func (t *http2Transport) CloseIdleConnections() {
 }
 
 var (
-       http2errClientConnClosed   = errors.New("http2: client conn is closed")
-       http2errClientConnUnusable = errors.New("http2: client conn not usable")
-
-       http2errClientConnGotGoAway                 = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
-       http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
+       http2errClientConnClosed    = errors.New("http2: client conn is closed")
+       http2errClientConnUnusable  = errors.New("http2: client conn not usable")
+       http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
 )
 
+// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip.
+// It is used to signal that err happened after part of Request.Body was sent to the server.
+type http2afterReqBodyWriteError struct {
+       err error
+}
+
+func (e http2afterReqBodyWriteError) Error() string {
+       return e.err.Error() + "; some request body already written"
+}
+
 // shouldRetryRequest is called by RoundTrip when a request fails to get
 // response headers. It is always called with a non-nil error.
 // It returns either a request to retry (either the same request, or a
 // modified clone), or an error if the request can't be replayed.
-func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
-       switch err {
-       default:
+func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) {
+       if !http2canRetryError(err) {
                return nil, err
-       case http2errClientConnUnusable, http2errClientConnGotGoAway:
+       }
+       if !afterBodyWrite {
+               return req, nil
+       }
+       // If the Body is nil (or http.NoBody), it's safe to reuse
+       // this request and its Body.
+       if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
                return req, nil
-       case http2errClientConnGotGoAwayAfterSomeReqBody:
-               // If the Body is nil (or http.NoBody), it's safe to reuse
-               // this request and its Body.
-               if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
-                       return req, nil
-               }
-               // Otherwise we depend on the Request having its GetBody
-               // func defined.
-               getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
-               if getBody == nil {
-                       return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
-               }
-               body, err := getBody()
-               if err != nil {
-                       return nil, err
-               }
-               newReq := *req
-               newReq.Body = body
-               return &newReq, nil
        }
+       // Otherwise we depend on the Request having its GetBody
+       // func defined.
+       getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
+       if getBody == nil {
+               return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
+       }
+       body, err := getBody()
+       if err != nil {
+               return nil, err
+       }
+       newReq := *req
+       newReq.Body = body
+       return &newReq, nil
+}
+
+func http2canRetryError(err error) bool {
+       if err == http2errClientConnUnusable || err == http2errClientConnGotGoAway {
+               return true
+       }
+       if se, ok := err.(http2StreamError); ok {
+               return se.Code == http2ErrCodeRefusedStream
+       }
+       return false
 }
 
 func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
@@ -7079,6 +7124,8 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
        }
 }
 
+// CanTakeNewRequest reports whether the connection can take a new request,
+// meaning it has not been closed or received or sent a GOAWAY.
 func (cc *http2ClientConn) CanTakeNewRequest() bool {
        cc.mu.Lock()
        defer cc.mu.Unlock()
@@ -7090,8 +7137,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
                return false
        }
        return cc.goAway == nil && !cc.closed &&
-               int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
-               cc.nextStreamID < math.MaxInt32
+               int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
 }
 
 // onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -7237,10 +7283,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
        hasTrailers := trailers != ""
 
        cc.mu.Lock()
-       cc.lastActive = time.Now()
-       if cc.closed || !cc.canTakeNewRequestLocked() {
+       if err := cc.awaitOpenSlotForRequest(req); err != nil {
                cc.mu.Unlock()
-               return nil, http2errClientConnUnusable
+               return nil, err
        }
 
        body := req.Body
@@ -7335,14 +7380,13 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
                        cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
                }
                if re.err != nil {
-                       if re.err == http2errClientConnGotGoAway {
-                               cc.mu.Lock()
-                               if cs.startedWrite {
-                                       re.err = http2errClientConnGotGoAwayAfterSomeReqBody
-                               }
-                               cc.mu.Unlock()
-                       }
+                       cc.mu.Lock()
+                       afterBodyWrite := cs.startedWrite
+                       cc.mu.Unlock()
                        cc.forgetStreamID(cs.ID)
+                       if afterBodyWrite {
+                               return nil, http2afterReqBodyWriteError{re.err}
+                       }
                        return nil, re.err
                }
                res.Request = req
@@ -7355,31 +7399,31 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
                case re := <-readLoopResCh:
                        return handleReadLoopResponse(re)
                case <-respHeaderTimer:
-                       cc.forgetStreamID(cs.ID)
                        if !hasBody || bodyWritten {
                                cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
                        } else {
                                bodyWriter.cancel()
                                cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
                        }
+                       cc.forgetStreamID(cs.ID)
                        return nil, http2errTimeout
                case <-ctx.Done():
-                       cc.forgetStreamID(cs.ID)
                        if !hasBody || bodyWritten {
                                cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
                        } else {
                                bodyWriter.cancel()
                                cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
                        }
+                       cc.forgetStreamID(cs.ID)
                        return nil, ctx.Err()
                case <-req.Cancel:
-                       cc.forgetStreamID(cs.ID)
                        if !hasBody || bodyWritten {
                                cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
                        } else {
                                bodyWriter.cancel()
                                cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
                        }
+                       cc.forgetStreamID(cs.ID)
                        return nil, http2errRequestCanceled
                case <-cs.peerReset:
                        // processResetStream already removed the
@@ -7406,6 +7450,45 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
        }
 }
 
+// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
+// Must hold cc.mu.
+func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error {
+       var waitingForConn chan struct{}
+       var waitingForConnErr error // guarded by cc.mu
+       for {
+               cc.lastActive = time.Now()
+               if cc.closed || !cc.canTakeNewRequestLocked() {
+                       return http2errClientConnUnusable
+               }
+               if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
+                       if waitingForConn != nil {
+                               close(waitingForConn)
+                       }
+                       return nil
+               }
+               // Unfortunately, we cannot wait on a condition variable and channel at
+               // the same time, so instead, we spin up a goroutine to check if the
+               // request is canceled while we wait for a slot to open in the connection.
+               if waitingForConn == nil {
+                       waitingForConn = make(chan struct{})
+                       go func() {
+                               if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
+                                       cc.mu.Lock()
+                                       waitingForConnErr = err
+                                       cc.cond.Broadcast()
+                                       cc.mu.Unlock()
+                               }
+                       }()
+               }
+               cc.pendingRequests++
+               cc.cond.Wait()
+               cc.pendingRequests--
+               if waitingForConnErr != nil {
+                       return waitingForConnErr
+               }
+       }
+}
+
 // requires cc.wmu be held
 func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
        first := true // first frame written (HEADERS is first, then CONTINUATION)
@@ -7765,7 +7848,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr
                        cc.idleTimer.Reset(cc.idleTimeout)
                }
                close(cs.done)
-               cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
+               // Wake up checkResetOrDone via clientStream.awaitFlowControl and
+               // wake up RoundTrip if there is a pending request.
+               cc.cond.Broadcast()
        }
        return cs
 }
@@ -7864,8 +7949,9 @@ func (rl *http2clientConnReadLoop) run() error {
                        cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
                }
                if se, ok := err.(http2StreamError); ok {
-                       if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
+                       if cs := cc.streamByID(se.StreamID, false); cs != nil {
                                cs.cc.writeStreamReset(cs.ID, se.Code, err)
+                               cs.cc.forgetStreamID(cs.ID)
                                if se.Cause == nil {
                                        se.Cause = cc.fr.errDetail
                                }
@@ -8187,6 +8273,7 @@ func (b http2transportResponseBody) Close() error {
        }
 
        cs.bufPipe.BreakWithError(http2errClosedResponseBody)
+       cc.forgetStreamID(cs.ID)
        return nil
 }