From: Tom Bergan Date: Wed, 9 Aug 2017 01:01:08 +0000 (-0700) Subject: net/http: update bundled http2 X-Git-Tag: go1.10beta1~1689 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=6b6b9f69fd38ca285b33917d6201dbcb11ca0324;p=gostls13.git net/http: update bundled http2 Updates http2 to x/net/http2 git rev 1c05540f687 for: http2: fix format argument warnings in tests https://golang.org/cl/48090 http2: retry requests after receiving REFUSED STREAM https://golang.org/cl/50471 http2: block RoundTrip when the Transport hits MaxConcurrentStreams https://golang.org/cl/53250 Fixes #13774 Fixes #20985 Fixes #21229 Change-Id: Ie19b4a7cc395a0b7a25fac55f5051faaf94920bb Reviewed-on: https://go-review.googlesource.com/54052 Run-TryBot: Tom Bergan TryBot-Result: Gobot Gobot Reviewed-by: Ian Lance Taylor --- diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 83f1671a5d..6c1077d678 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -30,6 +30,7 @@ import ( "io/ioutil" "log" "math" + mathrand "math/rand" "net" "net/http/httptrace" "net/textproto" @@ -6683,6 +6684,7 @@ type http2ClientConn struct { goAwayDebug string // goAway frame's debug data, retained as a string streams map[uint32]*http2clientStream // client-initiated nextStreamID uint32 + pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams pings map[[8]byte]chan struct{} // in flight ping data to notification channel bw *bufio.Writer br *bufio.Reader @@ -6735,35 +6737,45 @@ type http2clientStream struct { resTrailer *Header // client's Response.Trailer } -// awaitRequestCancel runs in its own goroutine and waits for the user -// to cancel a RoundTrip request, its context to expire, or for the -// request to be done (any way it might be removed from the cc.streams -// map: peer reset, successful completion, TCP connection breakage, -// etc) -func (cs *http2clientStream) awaitRequestCancel(req *Request) { +// awaitRequestCancel waits for the user to cancel a request or for the done +// channel to be signaled. A non-nil error is returned only if the request was +// canceled. +func http2awaitRequestCancel(req *Request, done <-chan struct{}) error { ctx := http2reqContext(req) if req.Cancel == nil && ctx.Done() == nil { - return + return nil } select { case <-req.Cancel: - cs.cancelStream() - cs.bufPipe.CloseWithError(http2errRequestCanceled) + return http2errRequestCanceled case <-ctx.Done(): + return ctx.Err() + case <-done: + return nil + } +} + +// awaitRequestCancel waits for the user to cancel a request, its context to +// expire, or for the request to be done (any way it might be removed from the +// cc.streams map: peer reset, successful completion, TCP connection breakage, +// etc). If the request is canceled, then cs will be canceled and closed. +func (cs *http2clientStream) awaitRequestCancel(req *Request) { + if err := http2awaitRequestCancel(req, cs.done); err != nil { cs.cancelStream() - cs.bufPipe.CloseWithError(ctx.Err()) - case <-cs.done: + cs.bufPipe.CloseWithError(err) } } func (cs *http2clientStream) cancelStream() { - cs.cc.mu.Lock() + cc := cs.cc + cc.mu.Lock() didReset := cs.didReset cs.didReset = true - cs.cc.mu.Unlock() + cc.mu.Unlock() if !didReset { - cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + cc.forgetStreamID(cs.ID) } } @@ -6848,7 +6860,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } addr := http2authorityAddr(req.URL.Scheme, req.URL.Host) - for { + for retry := 0; ; retry++ { cc, err := t.connPool().GetClientConn(req, addr) if err != nil { t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) @@ -6856,9 +6868,25 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } http2traceGotConn(req, cc) res, err := cc.RoundTrip(req) - if err != nil { - if req, err = http2shouldRetryRequest(req, err); err == nil { - continue + if err != nil && retry <= 6 { + afterBodyWrite := false + if e, ok := err.(http2afterReqBodyWriteError); ok { + err = e + afterBodyWrite = true + } + if req, err = http2shouldRetryRequest(req, err, afterBodyWrite); err == nil { + // After the first retry, do exponential backoff with 10% jitter. + if retry == 0 { + continue + } + backoff := float64(uint(1) << (uint(retry) - 1)) + backoff += backoff * (0.1 * mathrand.Float64()) + select { + case <-time.After(time.Second * time.Duration(backoff)): + continue + case <-http2reqContext(req).Done(): + return nil, http2reqContext(req).Err() + } } } if err != nil { @@ -6879,43 +6907,60 @@ func (t *http2Transport) CloseIdleConnections() { } var ( - http2errClientConnClosed = errors.New("http2: client conn is closed") - http2errClientConnUnusable = errors.New("http2: client conn not usable") - - http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") - http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written") + http2errClientConnClosed = errors.New("http2: client conn is closed") + http2errClientConnUnusable = errors.New("http2: client conn not usable") + http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") ) +// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip. +// It is used to signal that err happened after part of Request.Body was sent to the server. +type http2afterReqBodyWriteError struct { + err error +} + +func (e http2afterReqBodyWriteError) Error() string { + return e.err.Error() + "; some request body already written" +} + // shouldRetryRequest is called by RoundTrip when a request fails to get // response headers. It is always called with a non-nil error. // It returns either a request to retry (either the same request, or a // modified clone), or an error if the request can't be replayed. -func http2shouldRetryRequest(req *Request, err error) (*Request, error) { - switch err { - default: +func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) { + if !http2canRetryError(err) { return nil, err - case http2errClientConnUnusable, http2errClientConnGotGoAway: + } + if !afterBodyWrite { + return req, nil + } + // If the Body is nil (or http.NoBody), it's safe to reuse + // this request and its Body. + if req.Body == nil || http2reqBodyIsNoBody(req.Body) { return req, nil - case http2errClientConnGotGoAwayAfterSomeReqBody: - // If the Body is nil (or http.NoBody), it's safe to reuse - // this request and its Body. - if req.Body == nil || http2reqBodyIsNoBody(req.Body) { - return req, nil - } - // Otherwise we depend on the Request having its GetBody - // func defined. - getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody - if getBody == nil { - return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error") - } - body, err := getBody() - if err != nil { - return nil, err - } - newReq := *req - newReq.Body = body - return &newReq, nil } + // Otherwise we depend on the Request having its GetBody + // func defined. + getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody + if getBody == nil { + return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err) + } + body, err := getBody() + if err != nil { + return nil, err + } + newReq := *req + newReq.Body = body + return &newReq, nil +} + +func http2canRetryError(err error) bool { + if err == http2errClientConnUnusable || err == http2errClientConnGotGoAway { + return true + } + if se, ok := err.(http2StreamError); ok { + return se.Code == http2ErrCodeRefusedStream + } + return false } func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { @@ -7079,6 +7124,8 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { } } +// CanTakeNewRequest reports whether the connection can take a new request, +// meaning it has not been closed or received or sent a GOAWAY. func (cc *http2ClientConn) CanTakeNewRequest() bool { cc.mu.Lock() defer cc.mu.Unlock() @@ -7090,8 +7137,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { return false } return cc.goAway == nil && !cc.closed && - int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && - cc.nextStreamID < math.MaxInt32 + int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32 } // onIdleTimeout is called from a time.AfterFunc goroutine. It will @@ -7237,10 +7283,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { hasTrailers := trailers != "" cc.mu.Lock() - cc.lastActive = time.Now() - if cc.closed || !cc.canTakeNewRequestLocked() { + if err := cc.awaitOpenSlotForRequest(req); err != nil { cc.mu.Unlock() - return nil, http2errClientConnUnusable + return nil, err } body := req.Body @@ -7335,14 +7380,13 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } if re.err != nil { - if re.err == http2errClientConnGotGoAway { - cc.mu.Lock() - if cs.startedWrite { - re.err = http2errClientConnGotGoAwayAfterSomeReqBody - } - cc.mu.Unlock() - } + cc.mu.Lock() + afterBodyWrite := cs.startedWrite + cc.mu.Unlock() cc.forgetStreamID(cs.ID) + if afterBodyWrite { + return nil, http2afterReqBodyWriteError{re.err} + } return nil, re.err } res.Request = req @@ -7355,31 +7399,31 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { case re := <-readLoopResCh: return handleReadLoopResponse(re) case <-respHeaderTimer: - cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } + cc.forgetStreamID(cs.ID) return nil, http2errTimeout case <-ctx.Done(): - cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } + cc.forgetStreamID(cs.ID) return nil, ctx.Err() case <-req.Cancel: - cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } + cc.forgetStreamID(cs.ID) return nil, http2errRequestCanceled case <-cs.peerReset: // processResetStream already removed the @@ -7406,6 +7450,45 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { } } +// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. +// Must hold cc.mu. +func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error { + var waitingForConn chan struct{} + var waitingForConnErr error // guarded by cc.mu + for { + cc.lastActive = time.Now() + if cc.closed || !cc.canTakeNewRequestLocked() { + return http2errClientConnUnusable + } + if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { + if waitingForConn != nil { + close(waitingForConn) + } + return nil + } + // Unfortunately, we cannot wait on a condition variable and channel at + // the same time, so instead, we spin up a goroutine to check if the + // request is canceled while we wait for a slot to open in the connection. + if waitingForConn == nil { + waitingForConn = make(chan struct{}) + go func() { + if err := http2awaitRequestCancel(req, waitingForConn); err != nil { + cc.mu.Lock() + waitingForConnErr = err + cc.cond.Broadcast() + cc.mu.Unlock() + } + }() + } + cc.pendingRequests++ + cc.cond.Wait() + cc.pendingRequests-- + if waitingForConnErr != nil { + return waitingForConnErr + } + } +} + // requires cc.wmu be held func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { first := true // first frame written (HEADERS is first, then CONTINUATION) @@ -7765,7 +7848,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr cc.idleTimer.Reset(cc.idleTimeout) } close(cs.done) - cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl + // Wake up checkResetOrDone via clientStream.awaitFlowControl and + // wake up RoundTrip if there is a pending request. + cc.cond.Broadcast() } return cs } @@ -7864,8 +7949,9 @@ func (rl *http2clientConnReadLoop) run() error { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(http2StreamError); ok { - if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { + if cs := cc.streamByID(se.StreamID, false); cs != nil { cs.cc.writeStreamReset(cs.ID, se.Code, err) + cs.cc.forgetStreamID(cs.ID) if se.Cause == nil { se.Cause = cc.fr.errDetail } @@ -8187,6 +8273,7 @@ func (b http2transportResponseBody) Close() error { } cs.bufPipe.BreakWithError(http2errClosedResponseBody) + cc.forgetStreamID(cs.ID) return nil }