"io/ioutil"
"log"
"math"
+ mathrand "math/rand"
"net"
"net/http/httptrace"
"net/textproto"
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*http2clientStream // client-initiated
nextStreamID uint32
+ pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
resTrailer *Header // client's Response.Trailer
}
-// awaitRequestCancel runs in its own goroutine and waits for the user
-// to cancel a RoundTrip request, its context to expire, or for the
-// request to be done (any way it might be removed from the cc.streams
-// map: peer reset, successful completion, TCP connection breakage,
-// etc)
-func (cs *http2clientStream) awaitRequestCancel(req *Request) {
+// awaitRequestCancel waits for the user to cancel a request or for the done
+// channel to be signaled. A non-nil error is returned only if the request was
+// canceled.
+func http2awaitRequestCancel(req *Request, done <-chan struct{}) error {
ctx := http2reqContext(req)
if req.Cancel == nil && ctx.Done() == nil {
- return
+ return nil
}
select {
case <-req.Cancel:
- cs.cancelStream()
- cs.bufPipe.CloseWithError(http2errRequestCanceled)
+ return http2errRequestCanceled
case <-ctx.Done():
+ return ctx.Err()
+ case <-done:
+ return nil
+ }
+}
+
+// awaitRequestCancel waits for the user to cancel a request, its context to
+// expire, or for the request to be done (any way it might be removed from the
+// cc.streams map: peer reset, successful completion, TCP connection breakage,
+// etc). If the request is canceled, then cs will be canceled and closed.
+func (cs *http2clientStream) awaitRequestCancel(req *Request) {
+ if err := http2awaitRequestCancel(req, cs.done); err != nil {
cs.cancelStream()
- cs.bufPipe.CloseWithError(ctx.Err())
- case <-cs.done:
+ cs.bufPipe.CloseWithError(err)
}
}
func (cs *http2clientStream) cancelStream() {
- cs.cc.mu.Lock()
+ cc := cs.cc
+ cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
- cs.cc.mu.Unlock()
+ cc.mu.Unlock()
if !didReset {
- cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ cc.forgetStreamID(cs.ID)
}
}
}
addr := http2authorityAddr(req.URL.Scheme, req.URL.Host)
- for {
+ for retry := 0; ; retry++ {
cc, err := t.connPool().GetClientConn(req, addr)
if err != nil {
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
}
http2traceGotConn(req, cc)
res, err := cc.RoundTrip(req)
- if err != nil {
- if req, err = http2shouldRetryRequest(req, err); err == nil {
- continue
+ if err != nil && retry <= 6 {
+ afterBodyWrite := false
+ if e, ok := err.(http2afterReqBodyWriteError); ok {
+ err = e
+ afterBodyWrite = true
+ }
+ if req, err = http2shouldRetryRequest(req, err, afterBodyWrite); err == nil {
+ // After the first retry, do exponential backoff with 10% jitter.
+ if retry == 0 {
+ continue
+ }
+ backoff := float64(uint(1) << (uint(retry) - 1))
+ backoff += backoff * (0.1 * mathrand.Float64())
+ select {
+ case <-time.After(time.Second * time.Duration(backoff)):
+ continue
+ case <-http2reqContext(req).Done():
+ return nil, http2reqContext(req).Err()
+ }
}
}
if err != nil {
}
var (
- http2errClientConnClosed = errors.New("http2: client conn is closed")
- http2errClientConnUnusable = errors.New("http2: client conn not usable")
-
- http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
- http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
+ http2errClientConnClosed = errors.New("http2: client conn is closed")
+ http2errClientConnUnusable = errors.New("http2: client conn not usable")
+ http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
+// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip.
+// It is used to signal that err happened after part of Request.Body was sent to the server.
+type http2afterReqBodyWriteError struct {
+ err error
+}
+
+func (e http2afterReqBodyWriteError) Error() string {
+ return e.err.Error() + "; some request body already written"
+}
+
// shouldRetryRequest is called by RoundTrip when a request fails to get
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
-func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
- switch err {
- default:
+func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) {
+ if !http2canRetryError(err) {
return nil, err
- case http2errClientConnUnusable, http2errClientConnGotGoAway:
+ }
+ if !afterBodyWrite {
+ return req, nil
+ }
+ // If the Body is nil (or http.NoBody), it's safe to reuse
+ // this request and its Body.
+ if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
return req, nil
- case http2errClientConnGotGoAwayAfterSomeReqBody:
- // If the Body is nil (or http.NoBody), it's safe to reuse
- // this request and its Body.
- if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
- return req, nil
- }
- // Otherwise we depend on the Request having its GetBody
- // func defined.
- getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
- if getBody == nil {
- return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
- }
- body, err := getBody()
- if err != nil {
- return nil, err
- }
- newReq := *req
- newReq.Body = body
- return &newReq, nil
}
+ // Otherwise we depend on the Request having its GetBody
+ // func defined.
+ getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
+ if getBody == nil {
+ return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
+ }
+ body, err := getBody()
+ if err != nil {
+ return nil, err
+ }
+ newReq := *req
+ newReq.Body = body
+ return &newReq, nil
+}
+
+func http2canRetryError(err error) bool {
+ if err == http2errClientConnUnusable || err == http2errClientConnGotGoAway {
+ return true
+ }
+ if se, ok := err.(http2StreamError); ok {
+ return se.Code == http2ErrCodeRefusedStream
+ }
+ return false
}
func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
}
}
+// CanTakeNewRequest reports whether the connection can take a new request,
+// meaning it has not been closed or received or sent a GOAWAY.
func (cc *http2ClientConn) CanTakeNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
return false
}
return cc.goAway == nil && !cc.closed &&
- int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
- cc.nextStreamID < math.MaxInt32
+ int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
hasTrailers := trailers != ""
cc.mu.Lock()
- cc.lastActive = time.Now()
- if cc.closed || !cc.canTakeNewRequestLocked() {
+ if err := cc.awaitOpenSlotForRequest(req); err != nil {
cc.mu.Unlock()
- return nil, http2errClientConnUnusable
+ return nil, err
}
body := req.Body
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
if re.err != nil {
- if re.err == http2errClientConnGotGoAway {
- cc.mu.Lock()
- if cs.startedWrite {
- re.err = http2errClientConnGotGoAwayAfterSomeReqBody
- }
- cc.mu.Unlock()
- }
+ cc.mu.Lock()
+ afterBodyWrite := cs.startedWrite
+ cc.mu.Unlock()
cc.forgetStreamID(cs.ID)
+ if afterBodyWrite {
+ return nil, http2afterReqBodyWriteError{re.err}
+ }
return nil, re.err
}
res.Request = req
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
case <-respHeaderTimer:
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
+ cc.forgetStreamID(cs.ID)
return nil, http2errTimeout
case <-ctx.Done():
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
+ cc.forgetStreamID(cs.ID)
return nil, ctx.Err()
case <-req.Cancel:
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
+ cc.forgetStreamID(cs.ID)
return nil, http2errRequestCanceled
case <-cs.peerReset:
// processResetStream already removed the
}
}
+// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
+// Must hold cc.mu.
+func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error {
+ var waitingForConn chan struct{}
+ var waitingForConnErr error // guarded by cc.mu
+ for {
+ cc.lastActive = time.Now()
+ if cc.closed || !cc.canTakeNewRequestLocked() {
+ return http2errClientConnUnusable
+ }
+ if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
+ if waitingForConn != nil {
+ close(waitingForConn)
+ }
+ return nil
+ }
+ // Unfortunately, we cannot wait on a condition variable and channel at
+ // the same time, so instead, we spin up a goroutine to check if the
+ // request is canceled while we wait for a slot to open in the connection.
+ if waitingForConn == nil {
+ waitingForConn = make(chan struct{})
+ go func() {
+ if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
+ cc.mu.Lock()
+ waitingForConnErr = err
+ cc.cond.Broadcast()
+ cc.mu.Unlock()
+ }
+ }()
+ }
+ cc.pendingRequests++
+ cc.cond.Wait()
+ cc.pendingRequests--
+ if waitingForConnErr != nil {
+ return waitingForConnErr
+ }
+ }
+}
+
// requires cc.wmu be held
func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
first := true // first frame written (HEADERS is first, then CONTINUATION)
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
- cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
+ // Wake up checkResetOrDone via clientStream.awaitFlowControl and
+ // wake up RoundTrip if there is a pending request.
+ cc.cond.Broadcast()
}
return cs
}
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(http2StreamError); ok {
- if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
+ if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
+ cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
}
cs.bufPipe.BreakWithError(http2errClosedResponseBody)
+ cc.forgetStreamID(cs.ID)
return nil
}