From e6e6cad632f6192a146bbc586a62f2e34c96c2bf Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 24 May 2017 00:05:04 +0000 Subject: [PATCH] net/http: update bundled x/net/http2 This updates the bundled copy of x/net/http2 to x/net git rev a8e8f92cd6 for: http2: remove extra goroutine stack from awaitGracefulShutdown https://golang.org/cl/43230 http2: Discard DATA frames from the server after the response body is closed https://golang.org/cl/43810 Fixes #20302 Fixes #18471 Fixes #20448 Change-Id: I00972836deb2fe6049f631ee44901732a641b171 Reviewed-on: https://go-review.googlesource.com/44006 Reviewed-by: Tom Bergan --- src/net/http/h2_bundle.go | 237 +++++++++++++++++++++++++------------- 1 file changed, 157 insertions(+), 80 deletions(-) diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index a89f9e8736..6b8e9b4cdc 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -2933,6 +2933,11 @@ func http2reqBodyIsNoBody(body io.ReadCloser) bool { return body == NoBody } +func http2configureServer19(s *Server, conf *http2Server) error { + s.RegisterOnShutdown(conf.state.startGracefulShutdown) + return nil +} + var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" type http2goroutineLock uint64 @@ -3515,12 +3520,12 @@ func http2validPseudoPath(v string) bool { // underlying buffer is an interface. (io.Pipe is always unbuffered) type http2pipe struct { mu sync.Mutex - c sync.Cond // c.L lazily initialized to &p.mu - b http2pipeBuffer - err error // read error once empty. non-nil means closed. - breakErr error // immediate read error (caller doesn't see rest of b) - donec chan struct{} // closed on error - readFn func() // optional code to run in Read before error + c sync.Cond // c.L lazily initialized to &p.mu + b http2pipeBuffer // nil when done reading + err error // read error once empty. non-nil means closed. + breakErr error // immediate read error (caller doesn't see rest of b) + donec chan struct{} // closed on error + readFn func() // optional code to run in Read before error } type http2pipeBuffer interface { @@ -3532,6 +3537,9 @@ type http2pipeBuffer interface { func (p *http2pipe) Len() int { p.mu.Lock() defer p.mu.Unlock() + if p.b == nil { + return 0 + } return p.b.Len() } @@ -3555,6 +3563,7 @@ func (p *http2pipe) Read(d []byte) (n int, err error) { p.readFn() p.readFn = nil } + p.b = nil return 0, p.err } p.c.Wait() @@ -3575,6 +3584,9 @@ func (p *http2pipe) Write(d []byte) (n int, err error) { if p.err != nil { return 0, http2errClosedPipeWrite } + if p.breakErr != nil { + return len(d), nil + } return p.b.Write(d) } @@ -3609,6 +3621,9 @@ func (p *http2pipe) closeWithError(dst *error, err error, fn func()) { return } p.readFn = fn + if dst == &p.breakErr { + p.b = nil + } *dst = err p.closeDoneLocked() } @@ -3728,6 +3743,11 @@ type http2Server struct { // NewWriteScheduler constructs a write scheduler for a connection. // If nil, a default scheduler is chosen. NewWriteScheduler func() http2WriteScheduler + + // Internal state. This is a pointer (rather than embedded directly) + // so that we don't embed a Mutex in this struct, which will make the + // struct non-copyable, which might break some callers. + state *http2serverInternalState } func (s *http2Server) initialConnRecvWindowSize() int32 { @@ -3758,6 +3778,40 @@ func (s *http2Server) maxConcurrentStreams() uint32 { return http2defaultMaxStreams } +type http2serverInternalState struct { + mu sync.Mutex + activeConns map[*http2serverConn]struct{} +} + +func (s *http2serverInternalState) registerConn(sc *http2serverConn) { + if s == nil { + return + } + s.mu.Lock() + s.activeConns[sc] = struct{}{} + s.mu.Unlock() +} + +func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) { + if s == nil { + return + } + s.mu.Lock() + delete(s.activeConns, sc) + s.mu.Unlock() +} + +func (s *http2serverInternalState) startGracefulShutdown() { + if s == nil { + return + } + s.mu.Lock() + for sc := range s.activeConns { + sc.startGracefulShutdown() + } + s.mu.Unlock() +} + // ConfigureServer adds HTTP/2 support to a net/http Server. // // The configuration conf may be nil. @@ -3770,9 +3824,13 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { if conf == nil { conf = new(http2Server) } + conf.state = &http2serverInternalState{activeConns: make(map[*http2serverConn]struct{})} if err := http2configureServer18(s, conf); err != nil { return err } + if err := http2configureServer19(s, conf); err != nil { + return err + } if s.TLSConfig == nil { s.TLSConfig = new(tls.Config) @@ -3887,7 +3945,7 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { streams: make(map[uint32]*http2stream), readFrameCh: make(chan http2readFrameResult), wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), - wantStartPushCh: make(chan http2startPushRequest, 8), + serveMsgCh: make(chan interface{}, 8), wroteFrameCh: make(chan http2frameWriteResult, 1), bodyReadCh: make(chan http2bodyReadMsg), doneServing: make(chan struct{}), @@ -3900,6 +3958,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { pushEnabled: true, } + s.state.registerConn(sc) + defer s.state.unregisterConn(sc) + if sc.hs.WriteTimeout != 0 { sc.conn.SetWriteDeadline(time.Time{}) } @@ -3966,10 +4027,9 @@ type http2serverConn struct { doneServing chan struct{} // closed when serverConn.serve ends readFrameCh chan http2readFrameResult // written by serverConn.readFrames wantWriteFrameCh chan http2FrameWriteRequest // from handlers -> serve - wantStartPushCh chan http2startPushRequest // from handlers -> serve wroteFrameCh chan http2frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes bodyReadCh chan http2bodyReadMsg // from handlers -> serve - testHookCh chan func(int) // code to run on the serve loop + serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop flow http2flow // conn-wide (not stream-specific) outbound flow control inflow http2flow // conn-wide inbound flow control tlsState *tls.ConnectionState // shared by all handlers, like net/http @@ -4001,14 +4061,15 @@ type http2serverConn struct { inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop needToSendGoAway bool // we need to schedule a GOAWAY frame write goAwayCode http2ErrCode - shutdownTimerCh <-chan time.Time // nil until used - shutdownTimer *time.Timer // nil until used - idleTimer *time.Timer // nil if unused - idleTimerCh <-chan time.Time // nil if unused + shutdownTimer *time.Timer // nil until used + idleTimer *time.Timer // nil if unused // Owned by the writeFrameAsync goroutine: headerWriteBuf bytes.Buffer hpackEncoder *hpack.Encoder + + // Used by startGracefulShutdown. + shutdownOnce sync.Once } func (sc *http2serverConn) maxHeaderListSize() uint32 { @@ -4295,19 +4356,15 @@ func (sc *http2serverConn) serve() { sc.setConnState(StateIdle) if sc.srv.IdleTimeout != 0 { - sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout) + sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) defer sc.idleTimer.Stop() - sc.idleTimerCh = sc.idleTimer.C - } - - var gracefulShutdownCh <-chan struct{} - if sc.hs != nil { - gracefulShutdownCh = http2h1ServerShutdownChan(sc.hs) } go sc.readFrames() - settingsTimer := time.NewTimer(http2firstSettingsTimeout) + settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer) + defer settingsTimer.Stop() + loopNum := 0 for { loopNum++ @@ -4318,8 +4375,6 @@ func (sc *http2serverConn) serve() { break } sc.writeFrame(wr) - case spr := <-sc.wantStartPushCh: - sc.startPush(spr) case res := <-sc.wroteFrameCh: sc.wroteFrame(res) case res := <-sc.readFrameCh: @@ -4327,26 +4382,37 @@ func (sc *http2serverConn) serve() { return } res.readMore() - if settingsTimer.C != nil { + if settingsTimer != nil { settingsTimer.Stop() - settingsTimer.C = nil + settingsTimer = nil } case m := <-sc.bodyReadCh: sc.noteBodyRead(m.st, m.n) - case <-settingsTimer.C: - sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) - return - case <-gracefulShutdownCh: - gracefulShutdownCh = nil - sc.startGracefulShutdown() - case <-sc.shutdownTimerCh: - sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) - return - case <-sc.idleTimerCh: - sc.vlogf("connection is idle") - sc.goAway(http2ErrCodeNo) - case fn := <-sc.testHookCh: - fn(loopNum) + case msg := <-sc.serveMsgCh: + switch v := msg.(type) { + case func(int): + v(loopNum) + case *http2serverMessage: + switch v { + case http2settingsTimerMsg: + sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) + return + case http2idleTimerMsg: + sc.vlogf("connection is idle") + sc.goAway(http2ErrCodeNo) + case http2shutdownTimerMsg: + sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) + return + case http2gracefulShutdownMsg: + sc.startGracefulShutdownInternal() + default: + panic("unknown timer") + } + case *http2startPushRequest: + sc.startPush(v) + default: + panic(fmt.Sprintf("unexpected type %T", v)) + } } if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { @@ -4355,6 +4421,38 @@ func (sc *http2serverConn) serve() { } } +func (sc *http2serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { + select { + case <-sc.doneServing: + case <-sharedCh: + close(privateCh) + } +} + +type http2serverMessage int + +// Message values sent to serveMsgCh. +var ( + http2settingsTimerMsg = new(http2serverMessage) + http2idleTimerMsg = new(http2serverMessage) + http2shutdownTimerMsg = new(http2serverMessage) + http2gracefulShutdownMsg = new(http2serverMessage) +) + +func (sc *http2serverConn) onSettingsTimer() { sc.sendServeMsg(http2settingsTimerMsg) } + +func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) } + +func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) } + +func (sc *http2serverConn) sendServeMsg(msg interface{}) { + sc.serveG.checkNotOn() + select { + case sc.serveMsgCh <- msg: + case <-sc.doneServing: + } +} + // readPreface reads the ClientPreface greeting from the peer // or returns an error on timeout or an invalid greeting. func (sc *http2serverConn) readPreface() error { @@ -4630,10 +4728,19 @@ func (sc *http2serverConn) scheduleFrameWrite() { sc.inFrameScheduleLoop = false } -// startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the -// client we're gracefully shutting down. The connection isn't closed -// until all current streams are done. +// startGracefulShutdown gracefully shuts down a connection. This +// sends GOAWAY with ErrCodeNo to tell the client we're gracefully +// shutting down. The connection isn't closed until all current +// streams are done. +// +// startGracefulShutdown returns immediately; it does not wait until +// the connection has shut down. func (sc *http2serverConn) startGracefulShutdown() { + sc.serveG.checkNotOn() + sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) }) +} + +func (sc *http2serverConn) startGracefulShutdownInternal() { sc.goAwayIn(http2ErrCodeNo, 0) } @@ -4665,8 +4772,7 @@ func (sc *http2serverConn) goAwayIn(code http2ErrCode, forceCloseIn time.Duratio func (sc *http2serverConn) shutDownIn(d time.Duration) { sc.serveG.check() - sc.shutdownTimer = time.NewTimer(d) - sc.shutdownTimerCh = sc.shutdownTimer.C + sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) } func (sc *http2serverConn) resetStream(se http2StreamError) { @@ -4839,7 +4945,7 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) { sc.idleTimer.Reset(sc.srv.IdleTimeout) } if http2h1ServerKeepAlivesDisabled(sc.hs) { - sc.startGracefulShutdown() + sc.startGracefulShutdownInternal() } } if p := st.body; p != nil { @@ -4988,7 +5094,7 @@ func (sc *http2serverConn) processGoAway(f *http2GoAwayFrame) error { } else { sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) } - sc.startGracefulShutdown() + sc.startGracefulShutdownInternal() sc.pushEnabled = false return nil @@ -5918,7 +6024,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { return fmt.Errorf("method %q must be GET or HEAD", opts.Method) } - msg := http2startPushRequest{ + msg := &http2startPushRequest{ parent: st, method: opts.Method, url: u, @@ -5931,7 +6037,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { return http2errClientDisconnected case <-st.cw: return http2errStreamClosed - case sc.wantStartPushCh <- msg: + case sc.serveMsgCh <- msg: } select { @@ -5953,7 +6059,7 @@ type http2startPushRequest struct { done chan error } -func (sc *http2serverConn) startPush(msg http2startPushRequest) { +func (sc *http2serverConn) startPush(msg *http2startPushRequest) { sc.serveG.check() if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote { @@ -5979,7 +6085,7 @@ func (sc *http2serverConn) startPush(msg http2startPushRequest) { } if sc.maxPushPromiseID+2 >= 1<<31 { - sc.startGracefulShutdown() + sc.startGracefulShutdownInternal() return 0, http2ErrPushLimitReached } sc.maxPushPromiseID += 2 @@ -6099,31 +6205,6 @@ var http2badTrailer = map[string]bool{ "Www-Authenticate": true, } -// h1ServerShutdownChan returns a channel that will be closed when the -// provided *http.Server wants to shut down. -// -// This is a somewhat hacky way to get at http1 innards. It works -// when the http2 code is bundled into the net/http package in the -// standard library. The alternatives ended up making the cmd/go tool -// depend on http Servers. This is the lightest option for now. -// This is tested via the TestServeShutdown* tests in net/http. -func http2h1ServerShutdownChan(hs *Server) <-chan struct{} { - if fn := http2testh1ServerShutdownChan; fn != nil { - return fn(hs) - } - var x interface{} = hs - type I interface { - getDoneChan() <-chan struct{} - } - if hs, ok := x.(I); ok { - return hs.getDoneChan() - } - return nil -} - -// optional test hook for h1ServerShutdownChan. -var http2testh1ServerShutdownChan func(hs *Server) <-chan struct{} - // h1ServerKeepAlivesDisabled reports whether hs has its keep-alives // disabled. See comments on h1ServerShutdownChan above for why // the code is written this way. @@ -7681,6 +7762,7 @@ func (b http2transportResponseBody) Close() error { cc.wmu.Lock() if !serverSentStreamEnd { cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) + cs.didReset = true } if unread > 0 { @@ -7723,11 +7805,6 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { return nil } if f.Length > 0 { - if len(data) > 0 && cs.bufPipe.b == nil { - - cc.logf("http2: Transport received DATA frame for closed stream; closing connection") - return http2ConnectionError(http2ErrCodeProtocol) - } cc.mu.Lock() if cs.inflow.available() >= int32(f.Length) { -- 2.50.0