return body == NoBody
}
+func http2configureServer19(s *Server, conf *http2Server) error {
+ s.RegisterOnShutdown(conf.state.startGracefulShutdown)
+ return nil
+}
+
var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1"
type http2goroutineLock uint64
// underlying buffer is an interface. (io.Pipe is always unbuffered)
type http2pipe struct {
mu sync.Mutex
- c sync.Cond // c.L lazily initialized to &p.mu
- b http2pipeBuffer
- err error // read error once empty. non-nil means closed.
- breakErr error // immediate read error (caller doesn't see rest of b)
- donec chan struct{} // closed on error
- readFn func() // optional code to run in Read before error
+ c sync.Cond // c.L lazily initialized to &p.mu
+ b http2pipeBuffer // nil when done reading
+ err error // read error once empty. non-nil means closed.
+ breakErr error // immediate read error (caller doesn't see rest of b)
+ donec chan struct{} // closed on error
+ readFn func() // optional code to run in Read before error
}
type http2pipeBuffer interface {
func (p *http2pipe) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
+ if p.b == nil {
+ return 0
+ }
return p.b.Len()
}
p.readFn()
p.readFn = nil
}
+ p.b = nil
return 0, p.err
}
p.c.Wait()
if p.err != nil {
return 0, http2errClosedPipeWrite
}
+ if p.breakErr != nil {
+ return len(d), nil
+ }
return p.b.Write(d)
}
return
}
p.readFn = fn
+ if dst == &p.breakErr {
+ p.b = nil
+ }
*dst = err
p.closeDoneLocked()
}
// NewWriteScheduler constructs a write scheduler for a connection.
// If nil, a default scheduler is chosen.
NewWriteScheduler func() http2WriteScheduler
+
+ // Internal state. This is a pointer (rather than embedded directly)
+ // so that we don't embed a Mutex in this struct, which will make the
+ // struct non-copyable, which might break some callers.
+ state *http2serverInternalState
}
func (s *http2Server) initialConnRecvWindowSize() int32 {
return http2defaultMaxStreams
}
+type http2serverInternalState struct {
+ mu sync.Mutex
+ activeConns map[*http2serverConn]struct{}
+}
+
+func (s *http2serverInternalState) registerConn(sc *http2serverConn) {
+ if s == nil {
+ return
+ }
+ s.mu.Lock()
+ s.activeConns[sc] = struct{}{}
+ s.mu.Unlock()
+}
+
+func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) {
+ if s == nil {
+ return
+ }
+ s.mu.Lock()
+ delete(s.activeConns, sc)
+ s.mu.Unlock()
+}
+
+func (s *http2serverInternalState) startGracefulShutdown() {
+ if s == nil {
+ return
+ }
+ s.mu.Lock()
+ for sc := range s.activeConns {
+ sc.startGracefulShutdown()
+ }
+ s.mu.Unlock()
+}
+
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
if conf == nil {
conf = new(http2Server)
}
+ conf.state = &http2serverInternalState{activeConns: make(map[*http2serverConn]struct{})}
if err := http2configureServer18(s, conf); err != nil {
return err
}
+ if err := http2configureServer19(s, conf); err != nil {
+ return err
+ }
if s.TLSConfig == nil {
s.TLSConfig = new(tls.Config)
streams: make(map[uint32]*http2stream),
readFrameCh: make(chan http2readFrameResult),
wantWriteFrameCh: make(chan http2FrameWriteRequest, 8),
- wantStartPushCh: make(chan http2startPushRequest, 8),
+ serveMsgCh: make(chan interface{}, 8),
wroteFrameCh: make(chan http2frameWriteResult, 1),
bodyReadCh: make(chan http2bodyReadMsg),
doneServing: make(chan struct{}),
pushEnabled: true,
}
+ s.state.registerConn(sc)
+ defer s.state.unregisterConn(sc)
+
if sc.hs.WriteTimeout != 0 {
sc.conn.SetWriteDeadline(time.Time{})
}
doneServing chan struct{} // closed when serverConn.serve ends
readFrameCh chan http2readFrameResult // written by serverConn.readFrames
wantWriteFrameCh chan http2FrameWriteRequest // from handlers -> serve
- wantStartPushCh chan http2startPushRequest // from handlers -> serve
wroteFrameCh chan http2frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan http2bodyReadMsg // from handlers -> serve
- testHookCh chan func(int) // code to run on the serve loop
+ serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
flow http2flow // conn-wide (not stream-specific) outbound flow control
inflow http2flow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode http2ErrCode
- shutdownTimerCh <-chan time.Time // nil until used
- shutdownTimer *time.Timer // nil until used
- idleTimer *time.Timer // nil if unused
- idleTimerCh <-chan time.Time // nil if unused
+ shutdownTimer *time.Timer // nil until used
+ idleTimer *time.Timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
hpackEncoder *hpack.Encoder
+
+ // Used by startGracefulShutdown.
+ shutdownOnce sync.Once
}
func (sc *http2serverConn) maxHeaderListSize() uint32 {
sc.setConnState(StateIdle)
if sc.srv.IdleTimeout != 0 {
- sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout)
+ sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
- sc.idleTimerCh = sc.idleTimer.C
- }
-
- var gracefulShutdownCh <-chan struct{}
- if sc.hs != nil {
- gracefulShutdownCh = http2h1ServerShutdownChan(sc.hs)
}
go sc.readFrames()
- settingsTimer := time.NewTimer(http2firstSettingsTimeout)
+ settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
+ defer settingsTimer.Stop()
+
loopNum := 0
for {
loopNum++
break
}
sc.writeFrame(wr)
- case spr := <-sc.wantStartPushCh:
- sc.startPush(spr)
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
return
}
res.readMore()
- if settingsTimer.C != nil {
+ if settingsTimer != nil {
settingsTimer.Stop()
- settingsTimer.C = nil
+ settingsTimer = nil
}
case m := <-sc.bodyReadCh:
sc.noteBodyRead(m.st, m.n)
- case <-settingsTimer.C:
- sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
- return
- case <-gracefulShutdownCh:
- gracefulShutdownCh = nil
- sc.startGracefulShutdown()
- case <-sc.shutdownTimerCh:
- sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
- return
- case <-sc.idleTimerCh:
- sc.vlogf("connection is idle")
- sc.goAway(http2ErrCodeNo)
- case fn := <-sc.testHookCh:
- fn(loopNum)
+ case msg := <-sc.serveMsgCh:
+ switch v := msg.(type) {
+ case func(int):
+ v(loopNum)
+ case *http2serverMessage:
+ switch v {
+ case http2settingsTimerMsg:
+ sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
+ return
+ case http2idleTimerMsg:
+ sc.vlogf("connection is idle")
+ sc.goAway(http2ErrCodeNo)
+ case http2shutdownTimerMsg:
+ sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
+ return
+ case http2gracefulShutdownMsg:
+ sc.startGracefulShutdownInternal()
+ default:
+ panic("unknown timer")
+ }
+ case *http2startPushRequest:
+ sc.startPush(v)
+ default:
+ panic(fmt.Sprintf("unexpected type %T", v))
+ }
}
if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
}
}
+func (sc *http2serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
+ select {
+ case <-sc.doneServing:
+ case <-sharedCh:
+ close(privateCh)
+ }
+}
+
+type http2serverMessage int
+
+// Message values sent to serveMsgCh.
+var (
+ http2settingsTimerMsg = new(http2serverMessage)
+ http2idleTimerMsg = new(http2serverMessage)
+ http2shutdownTimerMsg = new(http2serverMessage)
+ http2gracefulShutdownMsg = new(http2serverMessage)
+)
+
+func (sc *http2serverConn) onSettingsTimer() { sc.sendServeMsg(http2settingsTimerMsg) }
+
+func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) }
+
+func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) }
+
+func (sc *http2serverConn) sendServeMsg(msg interface{}) {
+ sc.serveG.checkNotOn()
+ select {
+ case sc.serveMsgCh <- msg:
+ case <-sc.doneServing:
+ }
+}
+
// readPreface reads the ClientPreface greeting from the peer
// or returns an error on timeout or an invalid greeting.
func (sc *http2serverConn) readPreface() error {
sc.inFrameScheduleLoop = false
}
-// startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the
-// client we're gracefully shutting down. The connection isn't closed
-// until all current streams are done.
+// startGracefulShutdown gracefully shuts down a connection. This
+// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
+// shutting down. The connection isn't closed until all current
+// streams are done.
+//
+// startGracefulShutdown returns immediately; it does not wait until
+// the connection has shut down.
func (sc *http2serverConn) startGracefulShutdown() {
+ sc.serveG.checkNotOn()
+ sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) })
+}
+
+func (sc *http2serverConn) startGracefulShutdownInternal() {
sc.goAwayIn(http2ErrCodeNo, 0)
}
func (sc *http2serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
- sc.shutdownTimer = time.NewTimer(d)
- sc.shutdownTimerCh = sc.shutdownTimer.C
+ sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
}
func (sc *http2serverConn) resetStream(se http2StreamError) {
sc.idleTimer.Reset(sc.srv.IdleTimeout)
}
if http2h1ServerKeepAlivesDisabled(sc.hs) {
- sc.startGracefulShutdown()
+ sc.startGracefulShutdownInternal()
}
}
if p := st.body; p != nil {
} else {
sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
}
- sc.startGracefulShutdown()
+ sc.startGracefulShutdownInternal()
sc.pushEnabled = false
return nil
return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
}
- msg := http2startPushRequest{
+ msg := &http2startPushRequest{
parent: st,
method: opts.Method,
url: u,
return http2errClientDisconnected
case <-st.cw:
return http2errStreamClosed
- case sc.wantStartPushCh <- msg:
+ case sc.serveMsgCh <- msg:
}
select {
done chan error
}
-func (sc *http2serverConn) startPush(msg http2startPushRequest) {
+func (sc *http2serverConn) startPush(msg *http2startPushRequest) {
sc.serveG.check()
if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote {
}
if sc.maxPushPromiseID+2 >= 1<<31 {
- sc.startGracefulShutdown()
+ sc.startGracefulShutdownInternal()
return 0, http2ErrPushLimitReached
}
sc.maxPushPromiseID += 2
"Www-Authenticate": true,
}
-// h1ServerShutdownChan returns a channel that will be closed when the
-// provided *http.Server wants to shut down.
-//
-// This is a somewhat hacky way to get at http1 innards. It works
-// when the http2 code is bundled into the net/http package in the
-// standard library. The alternatives ended up making the cmd/go tool
-// depend on http Servers. This is the lightest option for now.
-// This is tested via the TestServeShutdown* tests in net/http.
-func http2h1ServerShutdownChan(hs *Server) <-chan struct{} {
- if fn := http2testh1ServerShutdownChan; fn != nil {
- return fn(hs)
- }
- var x interface{} = hs
- type I interface {
- getDoneChan() <-chan struct{}
- }
- if hs, ok := x.(I); ok {
- return hs.getDoneChan()
- }
- return nil
-}
-
-// optional test hook for h1ServerShutdownChan.
-var http2testh1ServerShutdownChan func(hs *Server) <-chan struct{}
-
// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
// disabled. See comments on h1ServerShutdownChan above for why
// the code is written this way.
cc.wmu.Lock()
if !serverSentStreamEnd {
cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
+ cs.didReset = true
}
if unread > 0 {
return nil
}
if f.Length > 0 {
- if len(data) > 0 && cs.bufPipe.b == nil {
-
- cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
- return http2ConnectionError(http2ErrCodeProtocol)
- }
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {