From f251708a733bfca46899d57baab7d2601f6d7057 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 6 Dec 2016 18:42:01 +0000 Subject: [PATCH] net/http: update bundled http2 Updates bundled x/net/http2 to git rev 8dab9293 for: http2: make Transport retry on server's GOAWAY graceful shutdown https://golang.org/cl/33971 Fixes #18083 Change-Id: I676f5eb4b490a4d86356778bb17296c451f16d90 Reviewed-on: https://go-review.googlesource.com/34011 Run-TryBot: Brad Fitzpatrick TryBot-Result: Gobot Gobot Reviewed-by: Tom Bergan --- src/net/http/h2_bundle.go | 87 +++++++++++++++++++++++++++++++++++---- 1 file changed, 78 insertions(+), 9 deletions(-) diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index bb7f05df2e..fd899034a7 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -2188,6 +2188,14 @@ func http2shouldLogPanic(panicValue interface{}) bool { return panicValue != nil && panicValue != ErrAbortHandler } +func http2reqGetBody(req *Request) func() (io.ReadCloser, error) { + return req.GetBody +} + +func http2reqBodyIsNoBody(body io.ReadCloser) bool { + return body == NoBody +} + var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" type http2goroutineLock uint64 @@ -3247,6 +3255,11 @@ func (sc *http2serverConn) maxHeaderListSize() uint32 { return uint32(n + typicalHeaders*perFieldOverhead) } +func (sc *http2serverConn) curOpenStreams() uint32 { + sc.serveG.check() + return sc.curClientStreams + sc.curPushedStreams +} + // stream represents a stream. This is the minimal metadata needed by // the serve goroutine. Most of the actual stream state is owned by // the http.Handler's goroutine in the responseWriter. Because the @@ -3560,7 +3573,7 @@ func (sc *http2serverConn) serve() { fn(loopNum) } - if sc.inGoAway && sc.curClientStreams == 0 && !sc.needToSendGoAway && !sc.writingFrame { + if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { return } } @@ -4373,7 +4386,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState } else { sc.curClientStreams++ } - if sc.curClientStreams+sc.curPushedStreams == 1 { + if sc.curOpenStreams() == 1 { sc.setConnState(StateActive) } @@ -5114,7 +5127,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { } for k := range opts.Header { if strings.HasPrefix(k, ":") { - return fmt.Errorf("promised request headers cannot include psuedo header %q", k) + return fmt.Errorf("promised request headers cannot include pseudo header %q", k) } switch strings.ToLower(k) { @@ -5510,6 +5523,7 @@ type http2clientStream struct { ID uint32 resc chan http2resAndError bufPipe http2pipe // buffered pipe with the flow-controlled response payload + startedWrite bool // started request body write; guarded by cc.mu requestedGzip bool on100 func() // optional code to run if get a 100 continue response @@ -5651,8 +5665,10 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } http2traceGotConn(req, cc) res, err := cc.RoundTrip(req) - if http2shouldRetryRequest(req, err) { - continue + if err != nil { + if req, err = http2shouldRetryRequest(req, err); err == nil { + continue + } } if err != nil { t.vlogf("RoundTrip failure: %v", err) @@ -5674,11 +5690,39 @@ func (t *http2Transport) CloseIdleConnections() { var ( http2errClientConnClosed = errors.New("http2: client conn is closed") http2errClientConnUnusable = errors.New("http2: client conn not usable") + + http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") + http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written") ) -func http2shouldRetryRequest(req *Request, err error) bool { +// shouldRetryRequest is called by RoundTrip when a request fails to get +// response headers. It is always called with a non-nil error. +// It returns either a request to retry (either the same request, or a +// modified clone), or an error if the request can't be replayed. +func http2shouldRetryRequest(req *Request, err error) (*Request, error) { + switch err { + default: + return nil, err + case http2errClientConnUnusable, http2errClientConnGotGoAway: + return req, nil + case http2errClientConnGotGoAwayAfterSomeReqBody: + + if req.Body == nil || http2reqBodyIsNoBody(req.Body) { + return req, nil + } - return err == http2errClientConnUnusable + getBody := http2reqGetBody(req) + if getBody == nil { + return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error") + } + body, err := getBody() + if err != nil { + return nil, err + } + newReq := *req + newReq.Body = body + return &newReq, nil + } } func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { @@ -5826,6 +5870,15 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { if old != nil && old.ErrCode != http2ErrCodeNo { cc.goAway.ErrCode = old.ErrCode } + last := f.LastStreamID + for streamID, cs := range cc.streams { + if streamID > last { + select { + case cs.resc <- http2resAndError{err: http2errClientConnGotGoAway}: + default: + } + } + } } func (cc *http2ClientConn) CanTakeNewRequest() bool { @@ -6059,6 +6112,13 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } if re.err != nil { + if re.err == http2errClientConnGotGoAway { + cc.mu.Lock() + if cs.startedWrite { + re.err = http2errClientConnGotGoAwayAfterSomeReqBody + } + cc.mu.Unlock() + } cc.forgetStreamID(cs.ID) return nil, re.err } @@ -7225,6 +7285,9 @@ func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reade resc := make(chan error, 1) s.resc = resc s.fn = func() { + cs.cc.mu.Lock() + cs.startedWrite = true + cs.cc.mu.Unlock() resc <- cs.writeRequestBody(body, cs.req.Body) } s.delay = t.expectContinueTimeout() @@ -7644,7 +7707,9 @@ type http2WriteScheduler interface { // https://tools.ietf.org/html/rfc7540#section-5.1 AdjustStream(streamID uint32, priority http2PriorityParam) - // Push queues a frame in the scheduler. + // Push queues a frame in the scheduler. In most cases, this will not be + // called with wr.StreamID()!=0 unless that stream is currently open. The one + // exception is RST_STREAM frames, which may be sent on idle or closed streams. Push(wr http2FrameWriteRequest) // Pop dequeues the next frame to write. Returns false if no frames can @@ -8183,7 +8248,11 @@ func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) { } else { n = ws.nodes[id] if n == nil { - panic("add on non-open stream") + + if wr.DataSize() > 0 { + panic("add DATA on non-open stream") + } + n = &ws.root } } n.q.push(wr) -- 2.50.0