// requires p.mu is held.
func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall {
if call, ok := p.dialing[addr]; ok {
-
+ // A dial is already in-flight. Don't start another.
return call
}
call := &http2dialCall{p: p, done: make(chan struct{})}
func (p *http2clientConnPool) closeIdleConnections() {
p.mu.Lock()
defer p.mu.Unlock()
-
+ // TODO: don't close a cc if it was just added to the pool
+ // milliseconds ago and has never been used. There's currently
+ // a small race window with the HTTP/1 Transport's integration
+ // where it can add an idle conn just before using it, and
+ // somebody else can concurrently call CloseIdleConns and
+ // break some caller's RoundTrip.
for _, vv := range p.conns {
for _, cc := range vv {
cc.closeIfIdle()
out = append(out, v)
}
}
-
+ // If we filtered it out, zero out the last item to prevent
+ // the GC from seeing it.
if len(in) != len(out) {
in[len(in)-1] = nil
}
go c.Close()
return http2erringRoundTripper{err}
} else if !used {
-
+ // Turns out we don't need this c.
+ // For example, two goroutines made requests to the same host
+ // at the same time, both kicking off TCP dials. (since protocol
+ // was unknown)
go c.Close()
}
return t2
ntotal += n
b.r += n
b.size -= n
-
+ // If the first chunk has been consumed, advance to the next chunk.
if b.r == len(b.chunks[0]) {
http2putDataBufferChunk(b.chunks[0])
end := len(b.chunks) - 1
func (b *http2dataBuffer) Write(p []byte) (int, error) {
ntotal := len(p)
for len(p) > 0 {
-
+ // If the last chunk is empty, allocate a new chunk. Try to allocate
+ // enough to fully copy p plus any additional bytes we expect to
+ // receive. However, this may allocate less than len(p).
want := int64(len(p))
if b.expected > want {
want = b.expected
// If the limit is hit, MetaHeadersFrame.Truncated is set true.
MaxHeaderListSize uint32
+ // TODO: track which type of frame & with which flags was sent
+ // last. Then return an error (unless AllowIllegalWrites) if
+ // we're in the middle of a header block and a
+ // non-Continuation or Continuation on a different stream is
+ // attempted to be written.
+
logReads, logWrites bool
debugFramer *http2Framer // only use for logging written writes
func (fr *http2Framer) maxHeaderListSize() uint32 {
if fr.MaxHeaderListSize == 0 {
- return 16 << 20
+ return 16 << 20 // sane default, per docs
}
return fr.MaxHeaderListSize
}
func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamID uint32) {
-
+ // Write the FrameHeader.
f.wbuf = append(f.wbuf[:0],
- 0,
+ 0, // 3 bytes of length, filled in in endWrite
0,
0,
byte(ftype),
}
func (f *http2Framer) endWrite() error {
-
+ // Now that we know the final size, fill in the FrameHeader in
+ // the space previously reserved for it. Abuse append.
length := len(f.wbuf) - http2frameHeaderLen
if length >= (1 << 24) {
return http2ErrFrameTooLarge
if f.debugFramer == nil {
f.debugFramerBuf = new(bytes.Buffer)
f.debugFramer = http2NewFramer(nil, f.debugFramerBuf)
- f.debugFramer.logReads = false
-
+ f.debugFramer.logReads = false // we log it ourselves, saying "wrote" below
+ // Let us read anything, even if we accidentally wrote it
+ // in the wrong order:
f.debugFramer.AllowIllegalReads = true
}
f.debugFramerBuf.Write(f.wbuf)
func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) {
if fh.StreamID == 0 {
-
+ // DATA frames MUST be associated with a stream. If a
+ // DATA frame is received whose stream identifier
+ // field is 0x0, the recipient MUST respond with a
+ // connection error (Section 5.4.1) of type
+ // PROTOCOL_ERROR.
return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"}
}
f := fc.getDataFrame()
}
}
if int(padSize) > len(payload) {
-
+ // If the length of the padding is greater than the
+ // length of the frame payload, the recipient MUST
+ // treat this as a connection error.
+ // Filed: https://github.com/http2/http2-spec/issues/610
return nil, http2connError{http2ErrCodeProtocol, "pad size larger than data payload"}
}
f.data = payload[:len(payload)-int(padSize)]
if !f.AllowIllegalWrites {
for _, b := range pad {
if b != 0 {
-
+ // "Padding octets MUST be set to zero when sending."
return http2errPadBytes
}
}
func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 {
-
+ // When this (ACK 0x1) bit is set, the payload of the
+ // SETTINGS frame MUST be empty. Receipt of a
+ // SETTINGS frame with the ACK flag set and a length
+ // field value other than 0 MUST be treated as a
+ // connection error (Section 5.4.1) of type
+ // FRAME_SIZE_ERROR.
return nil, http2ConnectionError(http2ErrCodeFrameSize)
}
if fh.StreamID != 0 {
-
+ // SETTINGS frames always apply to a connection,
+ // never a single stream. The stream identifier for a
+ // SETTINGS frame MUST be zero (0x0). If an endpoint
+ // receives a SETTINGS frame whose stream identifier
+ // field is anything other than 0x0, the endpoint MUST
+ // respond with a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR.
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
if len(p)%6 != 0 {
-
+ // Expecting even number of 6 byte settings.
return nil, http2ConnectionError(http2ErrCodeFrameSize)
}
f := &http2SettingsFrame{http2FrameHeader: fh, p: p}
if v, ok := f.Value(http2SettingInitialWindowSize); ok && v > (1<<31)-1 {
-
+ // Values above the maximum flow control window size of 2^31 - 1 MUST
+ // be treated as a connection error (Section 5.4.1) of type
+ // FLOW_CONTROL_ERROR.
return nil, http2ConnectionError(http2ErrCodeFlowControl)
}
return f, nil
if len(p) != 4 {
return nil, http2ConnectionError(http2ErrCodeFrameSize)
}
- inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff
+ inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff // mask off high reserved bit
if inc == 0 {
-
+ // A receiver MUST treat the receipt of a
+ // WINDOW_UPDATE frame with an flow control window
+ // increment of 0 as a stream error (Section 5.4.2) of
+ // type PROTOCOL_ERROR; errors on the connection flow
+ // control window MUST be treated as a connection
+ // error (Section 5.4.1).
if fh.StreamID == 0 {
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
// If the Stream ID is zero, the window update applies to the
// connection as a whole.
func (f *http2Framer) WriteWindowUpdate(streamID, incr uint32) error {
-
+ // "The legal range for the increment to the flow control window is 1 to 2^31-1 (2,147,483,647) octets."
if (incr < 1 || incr > 2147483647) && !f.AllowIllegalWrites {
return errors.New("illegal window increment value")
}
http2FrameHeader: fh,
}
if fh.StreamID == 0 {
-
+ // HEADERS frames MUST be associated with a stream. If a HEADERS frame
+ // is received whose stream identifier field is 0x0, the recipient MUST
+ // respond with a connection error (Section 5.4.1) of type
+ // PROTOCOL_ERROR.
return nil, http2connError{http2ErrCodeProtocol, "HEADERS frame with stream ID 0"}
}
var padLength uint8
return nil, err
}
hf.Priority.StreamDep = v & 0x7fffffff
- hf.Priority.Exclusive = (v != hf.Priority.StreamDep)
+ hf.Priority.Exclusive = (v != hf.Priority.StreamDep) // high bit was set
p, hf.Priority.Weight, err = http2readByte(p)
if err != nil {
return nil, err
return nil, http2connError{http2ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5", len(payload))}
}
v := binary.BigEndian.Uint32(payload[:4])
- streamID := v & 0x7fffffff
+ streamID := v & 0x7fffffff // mask off high bit
return &http2PriorityFrame{
http2FrameHeader: fh,
http2PriorityParam: http2PriorityParam{
Weight: payload[4],
StreamDep: streamID,
- Exclusive: streamID != v,
+ Exclusive: streamID != v, // was high bit set?
},
}, nil
}
http2FrameHeader: fh,
}
if pp.StreamID == 0 {
-
+ // PUSH_PROMISE frames MUST be associated with an existing,
+ // peer-initiated stream. The stream identifier of a
+ // PUSH_PROMISE frame indicates the stream it is associated
+ // with. If the stream identifier field specifies the value
+ // 0x0, a recipient MUST respond with a connection error
+ // (Section 5.4.1) of type PROTOCOL_ERROR.
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
// The PUSH_PROMISE frame includes optional padding.
pp.PromiseID = pp.PromiseID & (1<<31 - 1)
if int(padLength) > len(p) {
-
+ // like the DATA frame, error out if padding is longer than the body.
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
pp.headerFragBuf = p[:len(p)-int(padLength)]
default:
return http2pseudoHeaderError(hf.Name)
}
-
+ // Check for duplicates.
+ // This would be a bad algorithm, but N is 4.
+ // And this doesn't allocate.
for _, hf2 := range pf[:i] {
if hf.Name == hf2.Name {
return http2duplicatePseudoHeaderError(hf.Name)
if uint32(int(v)) == v {
return int(v)
}
-
+ // They had a crazy big number for MaxHeaderBytes anyway,
+ // so give them unlimited header lengths:
return 0
}
mh.Fields = append(mh.Fields, hf)
})
-
+ // Lose reference to MetaHeadersFrame:
defer hdec.SetEmitFunc(func(hf hpack.HeaderField) {})
var hc http2headersOrContinuation = hf
if f, err := fr.ReadFrame(); err != nil {
return nil, err
} else {
- hc = f.(*http2ContinuationFrame)
+ hc = f.(*http2ContinuationFrame) // guaranteed by checkFrameOrder
}
}
return nil
})
if n > 0 {
- buf.Truncate(buf.Len() - 1)
+ buf.Truncate(buf.Len() - 1) // remove trailing comma
}
case *http2DataFrame:
data := f.Data()
func http2cloneTLSConfig(c *tls.Config) *tls.Config {
c2 := c.Clone()
- c2.GetClientCertificate = c.GetClientCertificate
+ c2.GetClientCertificate = c.GetClientCertificate // golang.org/issue/19264
return c2
}
defer http2littleBuf.Put(bp)
b := *bp
b = b[:runtime.Stack(b, false)]
-
+ // Parse the 4707 out of "goroutine 4707 ["
b = bytes.TrimPrefix(b, http2goroutineSpace)
i := bytes.IndexByte(b, ' ')
if i < 0 {
goto Error
case 2 <= base && base <= 36:
+ // valid base; nothing to do
case base == 0:
-
+ // Look for octal, hex prefix.
switch {
case s[0] == '0' && len(s) > 1 && (s[1] == 'x' || s[1] == 'X'):
base = 16
}
if n >= cutoff {
-
+ // n*base overflows
n = 1<<64 - 1
err = strconv.ErrRange
goto Error
n1 := n + uint64(v)
if n1 < n || n1 > maxVal {
-
+ // n+v overflows
n = 1<<64 - 1
err = strconv.ErrRange
goto Error
// Valid reports whether the setting is valid.
func (s http2Setting) Valid() error {
-
+ // Limits and error codes from 6.5.2 Defined SETTINGS Parameters
switch s.ID {
case http2SettingEnablePush:
if s.Val != 1 && s.Val != 0 {
}
func (s *http2sorter) SortStrings(ss []string) {
-
+ // Our sorter works on s.v, which sorter owns, so
+ // stash it away while we sort the user's buffer.
save := s.v
s.v = ss
sort.Sort(s)
}
if p.err != nil {
if p.readFn != nil {
- p.readFn()
- p.readFn = nil
+ p.readFn() // e.g. copy trailers
+ p.readFn = nil // not sticky like p.err
}
p.b = nil
return 0, p.err
return 0, http2errClosedPipeWrite
}
if p.breakErr != nil {
- return len(d), nil
+ return len(d), nil // discard when there is no reader
}
return p.b.Write(d)
}
}
defer p.c.Signal()
if *dst != nil {
-
+ // Already been done.
return
}
p.readFn = fn
if p.donec == nil {
return
}
-
+ // Close if unclosed. This isn't racy since we always
+ // hold p.mu while closing.
select {
case <-p.donec:
default:
if p.donec == nil {
p.donec = make(chan struct{})
if p.err != nil || p.breakErr != nil {
-
+ // Already hit an error.
p.closeDoneLocked()
}
}
func (s *http2serverInternalState) registerConn(sc *http2serverConn) {
if s == nil {
- return
+ return // if the Server was used without calling ConfigureServer
}
s.mu.Lock()
s.activeConns[sc] = struct{}{}
func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) {
if s == nil {
- return
+ return // if the Server was used without calling ConfigureServer
}
s.mu.Lock()
delete(s.activeConns, sc)
func (s *http2serverInternalState) startGracefulShutdown() {
if s == nil {
- return
+ return // if the Server was used without calling ConfigureServer
}
s.mu.Lock()
for sc := range s.activeConns {
}
}
+ // Note: not setting MinVersion to tls.VersionTLS12,
+ // as we don't want to interfere with HTTP/1.1 traffic
+ // on the user's server. We enforce TLS 1.2 later once
+ // we accept a connection. Ideally this should be done
+ // during next-proto selection, but using TLS <1.2 with
+ // HTTP/2 is still the client's bug.
+
s.TLSConfig.PreferServerCipherSuites = true
haveNPN := false
readFrameCh: make(chan http2readFrameResult),
wantWriteFrameCh: make(chan http2FrameWriteRequest, 8),
serveMsgCh: make(chan interface{}, 8),
- wroteFrameCh: make(chan http2frameWriteResult, 1),
- bodyReadCh: make(chan http2bodyReadMsg),
+ wroteFrameCh: make(chan http2frameWriteResult, 1), // buffered; one send in writeFrameAsync
+ bodyReadCh: make(chan http2bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
- clientMaxStreams: math.MaxUint32,
+ clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams: s.maxConcurrentStreams(),
initialStreamSendWindowSize: http2initialWindowSize,
maxFrameSize: http2initialMaxFrameSize,
s.state.registerConn(sc)
defer s.state.unregisterConn(sc)
+ // The net/http package sets the write deadline from the
+ // http.Server.WriteTimeout during the TLS handshake, but then
+ // passes the connection off to us with the deadline already set.
+ // Write deadlines are set per stream in serverConn.newStream.
+ // Disarm the net.Conn write deadline here.
if sc.hs.WriteTimeout != 0 {
sc.conn.SetWriteDeadline(time.Time{})
}
sc.writeSched = http2NewRandomWriteScheduler()
}
+ // These start at the RFC-specified defaults. If there is a higher
+ // configured value for inflow, that will be updated when we send a
+ // WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(http2initialWindowSize)
sc.inflow.add(http2initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
if tc, ok := c.(http2connectionStater); ok {
sc.tlsState = new(tls.ConnectionState)
*sc.tlsState = tc.ConnectionState()
-
+ // 9.2 Use of TLS Features
+ // An implementation of HTTP/2 over TLS MUST use TLS
+ // 1.2 or higher with the restrictions on feature set
+ // and cipher suite described in this section. Due to
+ // implementation limitations, it might not be
+ // possible to fail TLS negotiation. An endpoint MUST
+ // immediately terminate an HTTP/2 connection that
+ // does not meet the TLS requirements described in
+ // this section with a connection error (Section
+ // 5.4.1) of type INADEQUATE_SECURITY.
if sc.tlsState.Version < tls.VersionTLS12 {
sc.rejectConn(http2ErrCodeInadequateSecurity, "TLS version too low")
return
}
if sc.tlsState.ServerName == "" {
-
+ // Client must use SNI, but we don't enforce that anymore,
+ // since it was causing problems when connecting to bare IP
+ // addresses during development.
+ //
+ // TODO: optionally enforce? Or enforce at the time we receive
+ // a new request, and verify the the ServerName matches the :authority?
+ // But that precludes proxy situations, perhaps.
+ //
+ // So for now, do nothing here again.
}
if !s.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) {
-
+ // "Endpoints MAY choose to generate a connection error
+ // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
+ // the prohibited cipher suites are negotiated."
+ //
+ // We choose that. In my opinion, the spec is weak
+ // here. It also says both parties must support at least
+ // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
+ // excuses here. If we really must, we could allow an
+ // "AllowInsecureWeakCiphers" option on the server later.
+ // Let's see how it plays out first.
sc.rejectConn(http2ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
return
}
func (sc *http2serverConn) rejectConn(err http2ErrCode, debug string) {
sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
-
+ // ignoring errors. hanging up anyway.
sc.framer.WriteGoAway(0, err, []byte(debug))
sc.bw.Flush()
sc.conn.Close()
func (sc *http2serverConn) state(streamID uint32) (http2streamState, *http2stream) {
sc.serveG.check()
-
+ // http://tools.ietf.org/html/rfc7540#section-5.1
if st, ok := sc.streams[streamID]; ok {
return st.state, st
}
-
+ // "The first use of a new stream identifier implicitly closes all
+ // streams in the "idle" state that might have been initiated by
+ // that peer with a lower-valued stream identifier. For example, if
+ // a client sends a HEADERS frame on stream 7 without ever sending a
+ // frame on stream 5, then stream 5 transitions to the "closed"
+ // state when the first frame for stream 7 is sent or received."
if streamID%2 == 1 {
if streamID <= sc.maxClientStreamID {
return http2stateClosed, nil
return false
}
+ // TODO: remove this string search and be more like the Windows
+ // case below. That might involve modifying the standard library
+ // to return better error types.
str := err.Error()
if strings.Contains(str, "use of closed network connection") {
return true
}
+ // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
+ // build tags, so I can't make an http2_windows.go file with
+ // Windows-specific stuff. Fix that and move this, once we
+ // have a way to bundle this into std's net/http somehow.
if runtime.GOOS == "windows" {
if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
return
}
if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) {
-
+ // Boring, expected errors.
sc.vlogf(format, args...)
} else {
sc.logf(format, args...)
}
func (sc *http2serverConn) notePanic() {
-
+ // Note: this is for serverConn.serve panicking, not http.Handler code.
if http2testHookOnPanicMu != nil {
http2testHookOnPanicMu.Lock()
defer http2testHookOnPanicMu.Unlock()
defer sc.conn.Close()
defer sc.closeAllStreamsOnConnClose()
defer sc.stopShutdownTimer()
- defer close(sc.doneServing)
+ defer close(sc.doneServing) // unblocks handlers trying to send
if http2VerboseLogs {
sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
})
sc.unackedSettings++
+ // Each connection starts with intialWindowSize inflow tokens.
+ // If a higher value is configured, we add more tokens.
if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return
}
-
+ // Now that we've got the preface, get us out of the
+ // "StateNew" state. We can't go directly to idle, though.
+ // Active means we read some data and anticipate a request. We'll
+ // do another Active when we get a HEADERS frame.
sc.setConnState(StateActive)
sc.setConnState(StateIdle)
defer sc.idleTimer.Stop()
}
- go sc.readFrames()
+ go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
case msg := <-sc.serveMsgCh:
switch v := msg.(type) {
case func(int):
- v(loopNum)
+ v(loopNum) // for testing
case *http2serverMessage:
switch v {
case http2settingsTimerMsg:
func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) }
func (sc *http2serverConn) sendServeMsg(msg interface{}) {
- sc.serveG.checkNotOn()
+ sc.serveG.checkNotOn() // NOT
select {
case sc.serveMsgCh <- msg:
case <-sc.doneServing:
func (sc *http2serverConn) readPreface() error {
errc := make(chan error, 1)
go func() {
-
+ // Read the client preface
buf := make([]byte, len(http2ClientPreface))
if _, err := io.ReadFull(sc.conn, buf); err != nil {
errc <- err
errc <- nil
}
}()
- timer := time.NewTimer(http2prefaceTimeout)
+ timer := time.NewTimer(http2prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C:
case <-sc.doneServing:
return http2errClientDisconnected
case <-stream.cw:
-
+ // If both ch and stream.cw were ready (as might
+ // happen on the final Write after an http.Handler
+ // ends), prefer the write result. Otherwise this
+ // might just be us successfully closing the stream.
+ // The writeFrameAsync and serve goroutines guarantee
+ // that the ch send will happen before the stream.cw
+ // close.
select {
case err = <-ch:
frameWriteDone = true
// buffered and is read by serve itself). If you're on the serve
// goroutine, call writeFrame instead.
func (sc *http2serverConn) writeFrameFromHandler(wr http2FrameWriteRequest) error {
- sc.serveG.checkNotOn()
+ sc.serveG.checkNotOn() // NOT
select {
case sc.wantWriteFrameCh <- wr:
return nil
case <-sc.doneServing:
-
+ // Serve loop is gone.
+ // Client has closed their connection to the server.
return http2errClientDisconnected
}
}
// If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool
+ // We are not allowed to write frames on closed streams. RFC 7540 Section
+ // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
+ // a closed stream." Our server never sends PRIORITY, so that exception
+ // does not apply.
+ //
+ // The serverConn might close an open stream while the stream's handler
+ // is still running. For example, the server might close a stream when it
+ // receives bad data from the client. If this happens, the handler might
+ // attempt to write a frame after the stream has been closed (since the
+ // handler hasn't yet been notified of the close). In this case, we simply
+ // ignore the frame. The handler will notice that the stream is closed when
+ // it waits for the frame to be written.
+ //
+ // As an exception to this rule, we allow sending RST_STREAM after close.
+ // This allows us to immediately reject new streams without tracking any
+ // state for those streams (except for the queued RST_STREAM frame). This
+ // may result in duplicate RST_STREAMs in some cases, but the client should
+ // ignore those.
if wr.StreamID() != 0 {
_, isReset := wr.write.(http2StreamError)
if state, _ := sc.state(wr.StreamID()); state == http2stateClosed && !isReset {
}
}
+ // Don't send a 100-continue response if we've already sent headers.
+ // See golang.org/issue/14030.
switch wr.write.(type) {
case *http2writeResHeaders:
wr.stream.wroteHeaders = true
case http2write100ContinueHeadersFrame:
if wr.stream.wroteHeaders {
-
+ // We do not need to notify wr.done because this frame is
+ // never written with wr.done != nil.
if wr.done != nil {
panic("wr.done != nil for write100ContinueHeadersFrame")
}
case http2stateHalfClosedLocal:
switch wr.write.(type) {
case http2StreamError, http2handlerPanicRST, http2writeWindowUpdate:
-
+ // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
+ // in this state. (We never send PRIORITY from the server, so that is not checked.)
default:
panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
}
}
switch st.state {
case http2stateOpen:
-
+ // Here we would go to stateHalfClosedLocal in
+ // theory, but since our handler is done and
+ // the net/http package provides no mechanism
+ // for closing a ResponseWriter while still
+ // reading data (see possible TODO at top of
+ // this file), we go into closed state here
+ // anyway, after telling the peer we're
+ // hanging up on them. We'll transition to
+ // stateClosed after the RST_STREAM frame is
+ // written.
st.state = http2stateHalfClosedLocal
-
+ // Section 8.1: a server MAY request that the client abort
+ // transmission of a request without error by sending a
+ // RST_STREAM with an error code of NO_ERROR after sending
+ // a complete response.
sc.resetStream(http2streamError(st.id, http2ErrCodeNo))
case http2stateHalfClosedRemote:
sc.closeStream(st, http2errHandlerComplete)
} else {
switch v := wr.write.(type) {
case http2StreamError:
-
+ // st may be unknown if the RST_STREAM was generated to reject bad input.
if st, ok := sc.streams[v.StreamID]; ok {
sc.closeStream(st, v)
}
}
}
+ // Reply (if requested) to unblock the ServeHTTP goroutine.
wr.replyToWriter(res.err)
sc.scheduleFrameWrite()
}
if sc.needsFrameFlush {
sc.startFrameWrite(http2FrameWriteRequest{write: http2flushFrameWriter{}})
- sc.needsFrameFlush = false
+ sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
continue
}
break
// startGracefulShutdown returns immediately; it does not wait until
// the connection has shut down.
func (sc *http2serverConn) startGracefulShutdown() {
- sc.serveG.checkNotOn()
+ sc.serveG.checkNotOn() // NOT
sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) })
}
if code != http2ErrCodeNo {
forceCloseIn = 250 * time.Millisecond
} else {
-
+ // TODO: configurable
forceCloseIn = 1 * time.Second
}
sc.goAwayIn(code, forceCloseIn)
if err != nil {
if err == http2ErrFrameTooLarge {
sc.goAway(http2ErrCodeFrameSize)
- return true
+ return true // goAway will close the loop
}
clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err)
if clientGone {
-
+ // TODO: could we also get into this state if
+ // the peer does a half close
+ // (e.g. CloseWrite) because they're done
+ // sending frames but they're still wanting
+ // our open replies? Investigate.
+ // TODO: add CloseWrite to crypto/tls.Conn first
+ // so we have a way to test this? I suppose
+ // just for testing we could have a non-TLS mode.
return false
}
} else {
case http2ConnectionError:
sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
sc.goAway(http2ErrCode(ev))
- return true
+ return true // goAway will handle shutdown
default:
if res.err != nil {
sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
func (sc *http2serverConn) processFrame(f http2Frame) error {
sc.serveG.check()
+ // First frame received must be SETTINGS.
if !sc.sawFirstSettings {
if _, ok := f.(*http2SettingsFrame); !ok {
return http2ConnectionError(http2ErrCodeProtocol)
case *http2GoAwayFrame:
return sc.processGoAway(f)
case *http2PushPromiseFrame:
-
+ // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
+ // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return http2ConnectionError(http2ErrCodeProtocol)
default:
sc.vlogf("http2: server ignoring frame: %v", f.Header())
func (sc *http2serverConn) processPing(f *http2PingFrame) error {
sc.serveG.check()
if f.IsAck() {
-
+ // 6.7 PING: " An endpoint MUST NOT respond to PING frames
+ // containing this flag."
return nil
}
if f.StreamID != 0 {
-
+ // "PING frames are not associated with any individual
+ // stream. If a PING frame is received with a stream
+ // identifier field value other than 0x0, the recipient MUST
+ // respond with a connection error (Section 5.4.1) of type
+ // PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo {
func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error {
sc.serveG.check()
switch {
- case f.StreamID != 0:
+ case f.StreamID != 0: // stream-level flow control
state, st := sc.state(f.StreamID)
if state == http2stateIdle {
-
+ // Section 5.1: "Receiving any frame other than HEADERS
+ // or PRIORITY on a stream in this state MUST be
+ // treated as a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
if st == nil {
-
+ // "WINDOW_UPDATE can be sent by a peer that has sent a
+ // frame bearing the END_STREAM flag. This means that a
+ // receiver could receive a WINDOW_UPDATE frame on a "half
+ // closed (remote)" or "closed" stream. A receiver MUST
+ // NOT treat this as an error, see Section 5.1."
return nil
}
if !st.flow.add(int32(f.Increment)) {
return http2streamError(f.StreamID, http2ErrCodeFlowControl)
}
- default:
+ default: // connection-level flow control
if !sc.flow.add(int32(f.Increment)) {
return http2goAwayFlowError{}
}
state, st := sc.state(f.StreamID)
if state == http2stateIdle {
-
+ // 6.4 "RST_STREAM frames MUST NOT be sent for a
+ // stream in the "idle" state. If a RST_STREAM frame
+ // identifying an idle stream is received, the
+ // recipient MUST treat this as a connection error
+ // (Section 5.4.1) of type PROTOCOL_ERROR.
return http2ConnectionError(http2ErrCodeProtocol)
}
if st != nil {
}
}
if p := st.body; p != nil {
-
+ // Return any buffered unread bytes worth of conn-level flow control.
+ // See golang.org/issue/16481
sc.sendWindowUpdate(nil, p.Len())
p.CloseWithError(err)
}
- st.cw.Close()
+ st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id)
}
if f.IsAck() {
sc.unackedSettings--
if sc.unackedSettings < 0 {
-
+ // Why is the peer ACKing settings we never sent?
+ // The spec doesn't mention this case, but
+ // hang up on them anyway.
return http2ConnectionError(http2ErrCodeProtocol)
}
return nil
case http2SettingInitialWindowSize:
return sc.processSettingInitialWindowSize(s.Val)
case http2SettingMaxFrameSize:
- sc.maxFrameSize = int32(s.Val)
+ sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
case http2SettingMaxHeaderListSize:
sc.peerMaxHeaderListSize = s.Val
default:
-
+ // Unknown setting: "An endpoint that receives a SETTINGS
+ // frame with any unknown or unsupported identifier MUST
+ // ignore that setting."
if http2VerboseLogs {
sc.vlogf("http2: server ignoring unknown setting %v", s)
}
func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error {
sc.serveG.check()
-
+ // Note: val already validated to be within range by
+ // processSetting's Valid call.
+
+ // "A SETTINGS frame can alter the initial flow control window
+ // size for all current streams. When the value of
+ // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
+ // adjust the size of all stream flow control windows that it
+ // maintains by the difference between the new value and the
+ // old value."
old := sc.initialStreamSendWindowSize
sc.initialStreamSendWindowSize = int32(val)
- growth := int32(val) - old
+ growth := int32(val) - old // may be negative
for _, st := range sc.streams {
if !st.flow.add(growth) {
-
+ // 6.9.2 Initial Flow Control Window Size
+ // "An endpoint MUST treat a change to
+ // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
+ // control window to exceed the maximum size as a
+ // connection error (Section 5.4.1) of type
+ // FLOW_CONTROL_ERROR."
return http2ConnectionError(http2ErrCodeFlowControl)
}
}
}
data := f.Data()
+ // "If a DATA frame is received whose stream is not in "open"
+ // or "half closed (local)" state, the recipient MUST respond
+ // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f.Header().StreamID
state, st := sc.state(id)
if id == 0 || state == http2stateIdle {
-
+ // Section 5.1: "Receiving any frame other than HEADERS
+ // or PRIORITY on a stream in this state MUST be
+ // treated as a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
if st == nil || state != http2stateOpen || st.gotTrailerHeader || st.resetQueued {
-
+ // This includes sending a RST_STREAM if the stream is
+ // in stateHalfClosedLocal (which currently means that
+ // the http.Handler returned, so it's done reading &
+ // done writing). Try to stop the client from sending
+ // more DATA.
+
+ // But still enforce their connection-level flow control,
+ // and return any flow control bytes since we're not going
+ // to consume them.
if sc.inflow.available() < int32(f.Length) {
return http2streamError(id, http2ErrCodeFlowControl)
}
-
+ // Deduct the flow control from inflow, since we're
+ // going to immediately add it back in
+ // sendWindowUpdate, which also schedules sending the
+ // frames.
sc.inflow.take(int32(f.Length))
- sc.sendWindowUpdate(nil, int(f.Length))
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
if st != nil && st.resetQueued {
-
+ // Already have a stream error in flight. Don't send another.
return nil
}
return http2streamError(id, http2ErrCodeStreamClosed)
panic("internal error: should have a body in this state")
}
+ // Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
return http2streamError(id, http2ErrCodeStreamClosed)
}
if f.Length > 0 {
-
+ // Check whether the client has flow control quota.
if st.inflow.available() < int32(f.Length) {
return http2streamError(id, http2ErrCodeFlowControl)
}
st.bodyBytes += int64(len(data))
}
+ // Return any padded flow control now, since we won't
+ // refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
sc.sendWindowUpdate32(nil, pad)
sc.sendWindowUpdate32(st, pad)
sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
}
sc.startGracefulShutdownInternal()
-
+ // http://tools.ietf.org/html/rfc7540#section-6.8
+ // We should not create any new streams, which means we should disable push.
sc.pushEnabled = false
return nil
}
func (st *http2stream) copyTrailersToHandlerRequest() {
for k, vv := range st.trailer {
if _, ok := st.reqTrailer[k]; ok {
-
+ // Only copy it over it was pre-declared.
st.reqTrailer[k] = vv
}
}
sc.serveG.check()
id := f.StreamID
if sc.inGoAway {
-
+ // Ignore.
return nil
}
-
+ // http://tools.ietf.org/html/rfc7540#section-5.1.1
+ // Streams initiated by a client MUST use odd-numbered stream
+ // identifiers. [...] An endpoint that receives an unexpected
+ // stream identifier MUST respond with a connection error
+ // (Section 5.4.1) of type PROTOCOL_ERROR.
if id%2 != 1 {
return http2ConnectionError(http2ErrCodeProtocol)
}
-
+ // A HEADERS frame can be used to create a new stream or
+ // send a trailer for an open one. If we already have a stream
+ // open, let it process its own HEADERS frame (trailers at this
+ // point, if it's valid).
if st := sc.streams[f.StreamID]; st != nil {
if st.resetQueued {
-
+ // We're sending RST_STREAM to close the stream, so don't bother
+ // processing this frame.
return nil
}
return st.processTrailerHeaders(f)
}
+ // [...] The identifier of a newly established stream MUST be
+ // numerically greater than all streams that the initiating
+ // endpoint has opened or reserved. [...] An endpoint that
+ // receives an unexpected stream identifier MUST respond with
+ // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
if id <= sc.maxClientStreamID {
return http2ConnectionError(http2ErrCodeProtocol)
}
sc.idleTimer.Stop()
}
+ // http://tools.ietf.org/html/rfc7540#section-5.1.2
+ // [...] Endpoints MUST NOT exceed the limit set by their peer. An
+ // endpoint that receives a HEADERS frame that causes their
+ // advertised concurrent stream limit to be exceeded MUST treat
+ // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
+ // or REFUSED_STREAM.
if sc.curClientStreams+1 > sc.advMaxStreams {
if sc.unackedSettings == 0 {
-
+ // They should know better.
return http2streamError(id, http2ErrCodeProtocol)
}
-
+ // Assume it's a network race, where they just haven't
+ // received our last SETTINGS update. But actually
+ // this can't happen yet, because we don't yet provide
+ // a way for users to adjust server parameters at
+ // runtime.
return http2streamError(id, http2ErrCodeRefusedStream)
}
if st.reqTrailer != nil {
st.trailer = make(Header)
}
- st.body = req.Body.(*http2requestBody).pipe
+ st.body = req.Body.(*http2requestBody).pipe // may be nil
st.declBodyBytes = req.ContentLength
handler := sc.handler.ServeHTTP
if f.Truncated {
-
+ // Their header list was too long. Send a 431 error.
handler = http2handleHeaderListTooLong
} else if err := http2checkValidHTTP2RequestHeaders(req.Header); err != nil {
handler = http2new400Handler(err)
}
+ // The net/http package sets the read deadline from the
+ // http.Server.ReadTimeout during the TLS handshake, but then
+ // passes the connection off to us with the deadline already
+ // set. Disarm it here after the request headers are read,
+ // similar to how the http1 server works. Here it's
+ // technically more like the http1 Server's ReadHeaderTimeout
+ // (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout != 0 {
sc.conn.SetReadDeadline(time.Time{})
}
for _, hf := range f.RegularFields() {
key := sc.canonicalHeader(hf.Name)
if !http2ValidTrailerHeader(key) {
-
+ // TODO: send more details to the peer somehow. But http2 has
+ // no way to send debug data at a stream level. Discuss with
+ // HTTP folk.
return http2streamError(st.id, http2ErrCodeProtocol)
}
st.trailer[key] = append(st.trailer[key], hf.Value)
func http2checkPriority(streamID uint32, p http2PriorityParam) error {
if streamID == p.StreamDep {
-
+ // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
+ // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
+ // Section 5.3.3 says that a stream can depend on one of its dependencies,
+ // so it's only self-dependencies that are forbidden.
return http2streamError(streamID, http2ErrCodeProtocol)
}
return nil
cancelCtx: cancelCtx,
}
st.cw.Init()
- st.flow.conn = &sc.flow
+ st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize)
- st.inflow.conn = &sc.inflow
+ st.inflow.conn = &sc.inflow // link to conn-level counter
st.inflow.add(sc.srv.initialStreamRecvWindowSize())
if sc.hs.WriteTimeout != 0 {
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
}
} else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
-
+ // See 8.1.2.6 Malformed Requests and Responses:
+ //
+ // Malformed requests or responses that are detected
+ // MUST be treated as a stream error (Section 5.4.2)
+ // of type PROTOCOL_ERROR."
+ //
+ // 8.1.2.3 Request Pseudo-Header Fields
+ // "All HTTP/2 requests MUST include exactly one valid
+ // value for the :method, :scheme, and :path
+ // pseudo-header fields"
return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
}
bodyOpen := !f.StreamEnded()
if rp.method == "HEAD" && bodyOpen {
-
+ // HEAD requests can't have bodies
return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
}
if needsContinue {
rp.header.Del("Expect")
}
-
+ // Merge Cookie headers into one "; "-delimited value.
if cookies := rp.header["Cookie"]; len(cookies) > 1 {
rp.header.Set("Cookie", strings.Join(cookies, "; "))
}
key = CanonicalHeaderKey(strings.TrimSpace(key))
switch key {
case "Transfer-Encoding", "Trailer", "Content-Length":
-
+ // Bogus. (copy of http1 rules)
+ // Ignore.
default:
if trailer == nil {
trailer = make(Header)
var requestURI string
if rp.method == "CONNECT" {
url_ = &url.URL{Host: rp.authority}
- requestURI = rp.authority
+ requestURI = rp.authority // mimic HTTP/1 server behavior
} else {
var err error
url_, err = url.ParseRequestURI(rp.path)
rws := http2responseWriterStatePool.Get().(*http2responseWriterState)
bwSave := rws.bw
- *rws = http2responseWriterState{}
+ *rws = http2responseWriterState{} // zero all the fields
rws.conn = sc
rws.bw = bwSave
rws.bw.Reset(http2chunkWriter{rws})
write: http2handlerPanicRST{rw.rws.stream.id},
stream: rw.rws.stream,
})
-
+ // Same as net/http:
if http2shouldLogPanic(e) {
const size = 64 << 10
buf := make([]byte, size)
// called from handler goroutines.
// h may be nil.
func (sc *http2serverConn) writeHeaders(st *http2stream, headerData *http2writeResHeaders) error {
- sc.serveG.checkNotOn()
+ sc.serveG.checkNotOn() // NOT on
var errc chan error
if headerData.h != nil {
-
+ // If there's a header map (which we don't own), so we have to block on
+ // waiting for this frame to be written, so an http.Flush mid-handler
+ // writes out the correct value of keys, before a handler later potentially
+ // mutates it.
errc = http2errChanPool.Get().(chan error)
}
if err := sc.writeFrameFromHandler(http2FrameWriteRequest{
// Notes that the handler for the given stream ID read n bytes of its body
// and schedules flow control tokens to be sent.
func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err error) {
- sc.serveG.checkNotOn()
+ sc.serveG.checkNotOn() // NOT on
if n > 0 {
select {
case sc.bodyReadCh <- http2bodyReadMsg{st, n}:
func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) {
sc.serveG.check()
- sc.sendWindowUpdate(nil, n)
+ sc.sendWindowUpdate(nil, n) // conn-level
if st.state != http2stateHalfClosedRemote && st.state != http2stateClosed {
-
+ // Don't send this WINDOW_UPDATE if the stream is closed
+ // remotely.
sc.sendWindowUpdate(st, n)
}
}
func (rws *http2responseWriterState) declareTrailer(k string) {
k = CanonicalHeaderKey(k)
if !http2ValidTrailerHeader(k) {
-
+ // Forbidden by RFC 2616 14.40.
rws.conn.logf("ignoring invalid trailer %q", k)
return
}
}
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
-
+ // TODO(bradfitz): be faster here, like net/http? measure.
date = time.Now().UTC().Format(TimeFormat)
}
endStream := rws.handlerDone && !rws.hasTrailers()
if len(p) > 0 || endStream {
-
+ // only send a 0 byte DATA frame if we're ending the stream.
if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
return 0, err
}
}
if rws.bw.Buffered() > 0 {
if err := rws.bw.Flush(); err != nil {
-
+ // Ignore the error. The frame writer already knows.
return
}
} else {
-
+ // The bufio.Writer won't call chunkWriter.Write
+ // (writeChunk with zero bytes, so we have to do it
+ // ourselves to force the HTTP response header and/or
+ // final DATA frame (with END_STREAM) to be sent.
rws.writeChunk(nil)
}
}
rws.closeNotifierCh = ch
cw := rws.stream.cw
go func() {
- cw.Wait()
+ cw.Wait() // wait for close
ch <- true
}()
}
if !http2bodyAllowedForStatus(rws.status) {
return 0, ErrBodyNotAllowed
}
- rws.wroteBytes += int64(len(dataB)) + int64(len(dataS))
+ rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
-
+ // TODO: send a RST_STREAM
return 0, errors.New("http2: handler wrote more than declared Content-Length")
}
sc := st.sc
sc.serveG.checkNotOn()
+ // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
+ // http://tools.ietf.org/html/rfc7540#section-6.6
if st.isPushed() {
return http2ErrRecursivePush
}
+ // Default options.
if opts.Method == "" {
opts.Method = "GET"
}
wantScheme = "https"
}
+ // Validate the request.
u, err := url.Parse(target)
if err != nil {
return err
if strings.HasPrefix(k, ":") {
return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
}
-
+ // These headers are meaningful only if the request has a body,
+ // but PUSH_PROMISE requests cannot have a body.
+ // http://tools.ietf.org/html/rfc7540#section-8.2
+ // Also disallow Host, since the promised URL must be absolute.
switch strings.ToLower(k) {
case "content-length", "content-encoding", "trailer", "te", "expect", "host":
return fmt.Errorf("promised request headers cannot include %q", k)
return err
}
+ // The RFC effectively limits promised requests to GET and HEAD:
+ // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
+ // http://tools.ietf.org/html/rfc7540#section-8.2
if opts.Method != "GET" && opts.Method != "HEAD" {
return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
}
func (sc *http2serverConn) startPush(msg *http2startPushRequest) {
sc.serveG.check()
+ // http://tools.ietf.org/html/rfc7540#section-6.6.
+ // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
+ // is in either the "open" or "half-closed (remote)" state.
if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote {
-
+ // responseWriter.Push checks that the stream is peer-initiaed.
msg.done <- http2errStreamClosed
return
}
+ // http://tools.ietf.org/html/rfc7540#section-6.6.
if !sc.pushEnabled {
msg.done <- ErrNotSupported
return
}
+ // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
+ // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
+ // is written. Once the ID is allocated, we start the request handler.
allocatePromisedID := func() (uint32, error) {
sc.serveG.check()
+ // Check this again, just in case. Technically, we might have received
+ // an updated SETTINGS by the time we got around to writing this frame.
if !sc.pushEnabled {
return 0, ErrNotSupported
}
-
+ // http://tools.ietf.org/html/rfc7540#section-6.5.2.
if sc.curPushedStreams+1 > sc.clientMaxStreams {
return 0, http2ErrPushLimitReached
}
+ // http://tools.ietf.org/html/rfc7540#section-5.1.1.
+ // Streams initiated by the server MUST use even-numbered identifiers.
+ // A server that is unable to establish a new stream identifier can send a GOAWAY
+ // frame so that the client is forced to open a new connection for new streams.
if sc.maxPushPromiseID+2 >= 1<<31 {
sc.startGracefulShutdownInternal()
return 0, http2ErrPushLimitReached
sc.maxPushPromiseID += 2
promisedID := sc.maxPushPromiseID
+ // http://tools.ietf.org/html/rfc7540#section-8.2.
+ // Strictly speaking, the new stream should start in "reserved (local)", then
+ // transition to "half closed (remote)" after sending the initial HEADERS, but
+ // we start in "half closed (remote)" for simplicity.
+ // See further comments at the definition of stateHalfClosedRemote.
promised := sc.newStream(promisedID, msg.parent.id, http2stateHalfClosedRemote)
rw, req, err := sc.newWriterAndRequestNoBody(promised, http2requestParam{
method: msg.method,
scheme: msg.url.Scheme,
authority: msg.url.Host,
path: msg.url.RequestURI(),
- header: http2cloneHeader(msg.header),
+ header: http2cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
})
if err != nil {
-
+ // Should not happen, since we've already validated msg.url.
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
}
// It requires Go 1.6 or later and returns an error if the net/http package is too old
// or if t1 has already been HTTP/2-enabled.
func http2ConfigureTransport(t1 *Transport) error {
- _, err := http2configureTransport(t1)
+ _, err := http2configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go
return err
}
// and returns a host:port. The port 443 is added if needed.
func http2authorityAddr(scheme string, authority string) (addr string) {
host, port, err := net.SplitHostPort(authority)
- if err != nil {
+ if err != nil { // authority didn't have a port
port = "443"
if scheme == "http" {
port = "80"
if a, err := idna.ToASCII(host); err == nil {
host = a
}
-
+ // IPv6 address literal, without a port:
if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
return host + ":" + port
}
case http2errClientConnUnusable, http2errClientConnGotGoAway:
return req, nil
case http2errClientConnGotGoAwayAfterSomeReqBody:
-
+ // If the Body is nil (or http.NoBody), it's safe to reuse
+ // this request and its Body.
if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
return req, nil
}
-
- getBody := http2reqGetBody(req)
+ // Otherwise we depend on the Request having its GetBody
+ // func defined.
+ getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
if getBody == nil {
return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
}
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
- maxFrameSize: 16 << 10,
- initialWindowSize: 65535,
- maxConcurrentStreams: 1000,
+ maxFrameSize: 16 << 10, // spec default
+ initialWindowSize: 65535, // spec default
+ maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
streams: make(map[uint32]*http2clientStream),
singleUse: singleUse,
wantSettingsAck: true,
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
+ // TODO: adjust this writer size to account for frame size +
+ // MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)
cc.fr = http2NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
+ // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on
+ // henc in response to SETTINGS frames?
cc.henc = hpack.NewEncoder(&cc.hbuf)
if cs, ok := c.(http2connectionStater); ok {
old := cc.goAway
cc.goAway = f
+ // Merge the previous and current GoAway error frames.
if cc.goAwayDebug == "" {
cc.goAwayDebug = string(f.DebugData())
}
}
cc.closed = true
nextID := cc.nextStreamID
-
+ // TODO: do clients send GOAWAY too? maybe? Just Close:
cc.mu.Unlock()
if http2VerboseLogs {
return
}
}
-
+ // forget about it.
}
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
if cc.t.t1 != nil {
return cc.t.t1.ResponseHeaderTimeout
}
-
+ // No way to do this (yet?) with just an http2.Transport. Probably
+ // no need. Request.Cancel this is the new way. We only need to support
+ // this for compatibility with the old http.Transport fields when
+ // we're doing transparent http2.
return 0
}
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
-
+ // Request gzip only, not deflate. Deflate is ambiguous and
+ // not as universally supported anyway.
+ // See: http://www.gzip.org/zlib/zlib_faq.html#faq38
+ //
+ // Note that we don't request this for HEAD requests,
+ // due to a bug in nginx:
+ // http://trac.nginx.org/nginx/ticket/358
+ // https://golang.org/issue/5522
+ //
+ // We don't request gzip if the request is for a range, since
+ // auto-decoding a portion of a gzipped document will just fail
+ // anyway. See https://golang.org/issue/8923
requestedGzip = true
}
+ // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
+ // sent by writeRequestBody below, along with any Trailers,
+ // again in form HEADERS{1}, CONTINUATION{0,})
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
if err != nil {
cc.mu.Unlock()
if werr != nil {
if hasBody {
- req.Body.Close()
+ req.Body.Close() // per RoundTripper contract
bodyWriter.cancel()
}
cc.forgetStreamID(cs.ID)
-
+ // Don't bother sending a RST_STREAM (our write already failed;
+ // no need to keep writing)
http2traceWroteRequest(cs.trace, werr)
return nil, werr
}
handleReadLoopResponse := func(re http2resAndError) (*Response, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
-
+ // On error or status code 3xx, 4xx, 5xx, etc abort any
+ // ongoing write, assuming that the server doesn't care
+ // about our request body. If the server replied with 1xx or
+ // 2xx, however, then assume the server DOES potentially
+ // want our body (e.g. full-duplex streaming:
+ // golang.org/issue/13444). If it turns out the server
+ // doesn't, they'll RST_STREAM us soon enough. This is a
+ // heuristic to avoid adding knobs to Transport. Hopefully
+ // we can keep it.
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
}
return nil, http2errRequestCanceled
case <-cs.peerReset:
-
+ // processResetStream already removed the
+ // stream from the streams map; no need for
+ // forgetStreamID.
return nil, cs.resetErr
case err := <-bodyWriter.resc:
-
+ // Prefer the read loop's response, if available. Issue 16102.
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
// requires cc.wmu be held
func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
- first := true
+ first := true // first frame written (HEADERS is first, then CONTINUATION)
frameSize := int(cc.maxFrameSize)
for len(hdrs) > 0 && cc.werr == nil {
chunk := hdrs
cc.fr.WriteContinuation(streamID, endHeaders, chunk)
}
}
-
+ // TODO(bradfitz): this Flush could potentially block (as
+ // could the WriteHeaders call(s) above), which means they
+ // wouldn't respond to Request.Cancel being readable. That's
+ // rare, but this should probably be in a goroutine.
cc.bw.Flush()
return cc.werr
}
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
cc := cs.cc
- sentEnd := false
+ sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
buf := cc.frameScratchBuffer()
defer cc.putFrameScratchBuffer(buf)
defer func() {
http2traceWroteRequest(cs.trace, err)
-
+ // TODO: write h12Compare test showing whether
+ // Request.Body is closed by the Transport,
+ // and in multiple cases: server replies <=299 and >299
+ // while still writing request body
cerr := bodyCloser.Close()
if err == nil {
err = cerr
sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
-
+ // TODO(bradfitz): this flush is for latency, not bandwidth.
+ // Most requests won't need this. Make this opt-in or
+ // opt-out? Use some heuristic on the body type? Nagel-like
+ // timers? Based on 'n'? Only last chunk of this for loop,
+ // unless flow control tokens are low? For now, always.
+ // If we change this, see comment below.
err = cc.bw.Flush()
}
cc.wmu.Unlock()
}
if sentEnd {
-
+ // Already sent END_STREAM (which implies we have no
+ // trailers) and flushed, because currently all
+ // WriteData frames above get a flush. So we're done.
return nil
}
cc.wmu.Lock()
defer cc.wmu.Unlock()
+ // Two ways to send END_STREAM: either with trailers, or
+ // with an empty DATA frame.
if len(trls) > 0 {
err = cc.writeHeaders(cs.ID, true, trls)
} else {
take := a
if int(take) > maxBytes {
- take = int32(maxBytes)
+ take = int32(maxBytes) // can't truncate int; take is int32
}
if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize)
}
}
+ // Check for any invalid headers and return an error before we
+ // potentially pollute our hpack state. (We want to be able to
+ // continue to reuse the hpack encoder for future requests)
for k, vv := range req.Header {
if !httplex.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("invalid HTTP header name %q", k)
}
}
+ // 8.1.2.3 Request Pseudo-Header Fields
+ // The :path pseudo-header field includes the path and query parts of the
+ // target URI (the path-absolute production and optionally a '?' character
+ // followed by the query production (see Sections 3.3 and 3.4 of
+ // [RFC3986]).
cc.writeHeader(":authority", host)
cc.writeHeader(":method", req.Method)
if req.Method != "CONNECT" {
lowKey := strings.ToLower(k)
switch lowKey {
case "host", "content-length":
-
+ // Host is :authority, already sent.
+ // Content-Length is automatic, set below.
continue
case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive":
-
+ // Per 8.1.2.2 Connection-Specific Header
+ // Fields, don't send connection-specific
+ // fields. We have already checked if any
+ // are error-worthy so just ignore the rest.
continue
case "user-agent":
-
+ // Match Go's http1 behavior: at most one
+ // User-Agent. If set to nil or empty string,
+ // then omit it. Otherwise if not mentioned,
+ // include the default (below).
didUA = true
if len(vv) < 1 {
continue
if contentLength < 0 {
return false
}
-
+ // For zero bodies, whether we send a content-length depends on the method.
+ // It also kinda doesn't matter for http2 either way, with END_STREAM.
switch method {
case "POST", "PUT", "PATCH":
return true
func (cc *http2ClientConn) encodeTrailers(req *Request) []byte {
cc.hbuf.Reset()
for k, vv := range req.Trailer {
-
+ // Transfer-Encoding, etc.. have already been filter at the
+ // start of RoundTrip
lowKey := strings.ToLower(k)
for _, v := range vv {
cc.writeHeader(lowKey, v)
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
- cc.cond.Broadcast()
+ cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
return cs
}
cc.idleTimer.Stop()
}
+ // Close any response bodies if the server closes prematurely.
+ // TODO: also do this if we've written the headers but not
+ // gotten a response yet.
err := cc.readerErr
cc.mu.Lock()
if cc.goAway != nil && http2isEOFOrNetReadError(err) {
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
- gotReply := false
+ gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {
f, err := cc.fr.ReadFrame()
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(http2StreamError); ok {
- if cs := cc.streamByID(se.StreamID, true); cs != nil {
+ if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
gotSettings = true
}
- maybeIdle := false
+ maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *http2MetaHeadersFrame:
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
if cs == nil {
-
+ // We'd get here if we canceled a request while the
+ // server had its response still in flight. So if this
+ // was just something we canceled, ignore it.
return nil
}
if !cs.firstByte {
if cs.trace != nil {
-
+ // TODO(bradfitz): move first response byte earlier,
+ // when we first read the 9 byte header, not waiting
+ // until all the HEADERS+CONTINUATION frames have been
+ // merged. This works for now.
http2traceFirstResponseByte(cs.trace)
}
cs.firstByte = true
if _, ok := err.(http2ConnectionError); ok {
return err
}
-
+ // Any other error type is a stream error.
cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err)
cs.resc <- http2resAndError{err: err}
- return nil
+ return nil // return nil from process* funcs to keep conn alive
}
if res == nil {
-
+ // (nil, nil) special case. See handleResponse docs.
return nil
}
if res.Body != http2noBody {
if statusCode == 100 {
http2traceGot100Continue(cs.trace)
if cs.on100 != nil {
- cs.on100()
+ cs.on100() // forces any write delay timer to fire
}
- cs.pastHeaders = false
+ cs.pastHeaders = false // do it all again
return nil, nil
}
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
res.ContentLength = clen64
} else {
-
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
-
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
}
}
func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *http2MetaHeadersFrame) error {
if cs.pastTrailers {
-
+ // Too many HEADERS frames for this stream.
return http2ConnectionError(http2ErrCodeProtocol)
}
cs.pastTrailers = true
if !f.StreamEnded() {
-
+ // We expect that any headers for trailers also
+ // has END_STREAM.
return http2ConnectionError(http2ErrCodeProtocol)
}
if len(f.PseudoFields()) > 0 {
-
+ // No pseudo header fields are defined for trailers.
+ // TODO: ConnectionError might be overly harsh? Check.
return http2ConnectionError(http2ErrCodeProtocol)
}
}
}
if n == 0 {
-
+ // No flow control tokens to send back.
return
}
defer cc.mu.Unlock()
var connAdd, streamAdd int32
-
+ // Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
connAdd = http2transportDefaultConnFlow - v
cc.inflow.add(connAdd)
}
- if err == nil {
-
+ if err == nil { // No need to refresh if the stream is over or failed.
+ // Consider any buffered body data (read from the conn but not
+ // consumed by the client) when computing flow control for this
+ // stream.
v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
streamAdd = int32(http2transportDefaultStreamFlow - v)
cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
cs.didReset = true
}
-
+ // Return connection-level flow control.
if unread > 0 {
cc.inflow.add(int32(unread))
cc.fr.WriteWindowUpdate(0, uint32(unread))
neverSent := cc.nextStreamID
cc.mu.Unlock()
if f.StreamID >= neverSent {
-
+ // We never asked for this.
cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
return http2ConnectionError(http2ErrCodeProtocol)
}
+ // We probably did ask for this, but canceled. Just ignore it.
+ // TODO: be stricter here? only silently ignore things which
+ // we canceled, but not things which were closed normally
+ // by the peer? Tough without accumulating too much state.
+ // But at least return their flow control:
if f.Length > 0 {
cc.mu.Lock()
cc.inflow.add(int32(f.Length))
return nil
}
if f.Length > 0 {
-
+ // Check connection-level flow control.
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
cs.inflow.take(int32(f.Length))
cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
-
+ // Return any padded flow control now, since we won't
+ // refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
cs.inflow.add(pad)
cc.inflow.add(pad)
var http2errInvalidTrailers = errors.New("http2: invalid trailers")
func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) {
-
+ // TODO: check that any declared content-length matches, like
+ // server.go's (*stream).endStream method.
rl.endStreamError(cs, nil)
}
cc := rl.cc
cc.t.connPool().MarkDead(cc)
if f.ErrCode != 0 {
-
+ // TODO: deal with GOAWAY more. particularly the error code
cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
}
cc.setGoAway(f)
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
case http2SettingInitialWindowSize:
-
+ // Values above the maximum flow-control
+ // window size of 2^31-1 MUST be treated as a
+ // connection error (Section 5.4.1) of type
+ // FLOW_CONTROL_ERROR.
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
}
+ // Adjust flow control of currently-open
+ // frames by the difference of the old initial
+ // window size and this one.
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
cs.flow.add(delta)
cc.initialWindowSize = s.Val
default:
-
+ // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
cc.vlogf("Unhandled Setting: %v", s)
}
return nil
func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error {
cs := rl.cc.streamByID(f.StreamID, true)
if cs == nil {
-
+ // TODO: return error if server tries to RST_STEAM an idle stream
return nil
}
select {
case <-cs.peerReset:
-
+ // Already reset.
+ // This is the only goroutine
+ // which closes this, so there
+ // isn't a race.
default:
err := http2streamError(cs.ID, f.ErrCode)
cs.resetErr = err
close(cs.peerReset)
cs.bufPipe.CloseWithError(err)
- cs.cc.cond.Broadcast()
+ cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
delete(rl.activeRes, cs.ID)
return nil
return err
}
cc.mu.Lock()
-
+ // check for dup before insert
if _, found := cc.pings[p]; !found {
cc.pings[p] = c
cc.mu.Unlock()
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
-
+ // connection closed
return cc.readerErr
}
}
cc := rl.cc
cc.mu.Lock()
defer cc.mu.Unlock()
-
+ // If ack, notify listener if any
if c, ok := cc.pings[f.Data]; ok {
close(c)
delete(cc.pings, f.Data)
}
func (rl *http2clientConnReadLoop) processPushPromise(f *http2PushPromiseFrame) error {
-
+ // We told the peer we don't want them.
+ // Spec says:
+ // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
+ // setting of the peer endpoint is set to 0. An endpoint that
+ // has set this setting and has received acknowledgement MUST
+ // treat the receipt of a PUSH_PROMISE frame as a connection
+ // error (Section 5.4.1) of type PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, err error) {
-
+ // TODO: map err to more interesting error codes, once the
+ // HTTP community comes up with some. But currently for
+ // RST_STREAM there's no equivalent to GOAWAY frame's debug
+ // data, and the error codes are all pretty vague ("cancel").
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
cc.bw.Flush()
func (s http2bodyWriterState) on100() {
if s.timer == nil {
-
+ // If we didn't do a delayed write, ignore the server's
+ // bogus 100 continue response.
return
}
s.timer.Stop()
// called until after the headers have been written.
func (s http2bodyWriterState) scheduleBodyWrite() {
if s.timer == nil {
-
+ // We're not doing a delayed write (see
+ // getBodyWriterState), so just start the writing
+ // goroutine immediately.
go s.fn()
return
}
case *http2writeResHeaders:
return v.endStream
case nil:
-
+ // This can only happen if the caller reuses w after it's
+ // been intentionally nil'ed out to prevent use. Keep this
+ // here to catch future refactoring breaking it.
panic("writeEndsStream called on nil writeFramer")
}
return false
func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error {
err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil)
if p.code != 0 {
- ctx.Flush()
+ ctx.Flush() // ignore error: we're hanging up on them anyway
time.Sleep(50 * time.Millisecond)
ctx.CloseConn()
}
}
func (w *http2writeResHeaders) staysWithinBuffer(max int) bool {
-
+ // TODO: this is a common one. It'd be nice to return true
+ // here and get into the fast path if we could be clever and
+ // calculate the size fast enough, or at least a conservative
+ // uppper bound that usually fires. (Maybe if w.h and
+ // w.trailers are nil, so we don't need to enumerate it.)
+ // Otherwise I'm afraid that just calculating the length to
+ // answer this question would be slower than the ~2µs benefit.
return false
}
}
func (w *http2writePushPromise) staysWithinBuffer(max int) bool {
-
+ // TODO: see writeResHeaders.staysWithinBuffer
return false
}
}
func (w http2write100ContinueHeadersFrame) staysWithinBuffer(max int) bool {
-
+ // Sloppy but conservative:
return 9+2*(len(":status")+len("100")) <= max
}
func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) {
if keys == nil {
sorter := http2sorterPool.Get().(*http2sorter)
-
+ // Using defer here, since the returned keys from the
+ // sorter.Keys method is only valid until the sorter
+ // is returned:
defer http2sorterPool.Put(sorter)
keys = sorter.Keys(h)
}
vv := h[k]
k = http2lowerHeader(k)
if !http2validWireHeaderFieldName(k) {
-
+ // Skip it as backup paranoia. Per
+ // golang.org/issue/14048, these should
+ // already be rejected at a higher level.
continue
}
isTE := k == "transfer-encoding"
for _, v := range vv {
if !httplex.ValidHeaderFieldValue(v) {
-
+ // TODO: return an error? golang.org/issue/14048
+ // For now just omit it.
continue
}
-
+ // TODO: more of "8.1.2.2 Connection-Specific Header Fields"
if isTE && v != "trailers" {
continue
}
func (wr http2FrameWriteRequest) StreamID() uint32 {
if wr.stream == nil {
if se, ok := wr.write.(http2StreamError); ok {
-
+ // (*serverConn).resetStream doesn't set
+ // stream because it doesn't necessarily have
+ // one. So special case this type of write
+ // message.
return se.StreamID
}
return 0
func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2FrameWriteRequest, int) {
var empty http2FrameWriteRequest
+ // Non-DATA frames are always consumed whole.
wd, ok := wr.write.(*http2writeData)
if !ok || len(wd.p) == 0 {
return wr, empty, 1
}
+ // Might need to split after applying limits.
allowed := wr.stream.flow.available()
if n < allowed {
allowed = n
write: &http2writeData{
streamID: wd.streamID,
p: wd.p[:allowed],
-
+ // Even if the original had endStream set, there
+ // are bytes remaining because len(wd.p) > allowed,
+ // so we know endStream is false.
endStream: false,
},
-
+ // Our caller is blocking on the final DATA frame, not
+ // this intermediate frame, so no need to wait.
done: nil,
}
rest := http2FrameWriteRequest{
return consumed, rest, 2
}
+ // The frame is consumed whole.
+ // NB: This cast cannot overflow because allowed is <= math.MaxInt32.
wr.stream.flow.take(int32(len(wd.p)))
return wr, empty, 1
}
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
- wr.write = nil
+ wr.write = nil // prevent use (assume it's tainted after wr.done send)
}
// writeQueue is used by implementations of WriteScheduler.
panic("invalid use of queue")
}
wr := q.s[0]
-
+ // TODO: less copy-happy queue.
copy(q.s, q.s[1:])
q.s[len(q.s)-1] = http2FrameWriteRequest{}
q.s = q.s[:len(q.s)-1]
type http2writeQueuePool []*http2writeQueue
+// put inserts an unused writeQueue into the pool.
+
// put inserts an unused writeQueue into the pool.
func (p *http2writeQueuePool) put(q *http2writeQueue) {
for i := range q.s {
}
// RFC 7540, Section 5.3.5: the default weight is 16.
-const http2priorityDefaultWeight = 15 // 16 = 15 + 1
+const http2priorityDefaultWeight = 15
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
type http2PriorityWriteSchedulerConfig struct {
// If cfg is nil, default options are used.
func http2NewPriorityWriteScheduler(cfg *http2PriorityWriteSchedulerConfig) http2WriteScheduler {
if cfg == nil {
-
+ // For justification of these defaults, see:
+ // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
cfg = &http2PriorityWriteSchedulerConfig{
MaxClosedNodesInTree: 10,
MaxIdleNodesInTree: 10,
if n.parent == parent {
return
}
-
+ // Unlink from current parent.
if parent := n.parent; parent != nil {
if n.prev == nil {
parent.kids = n.next
n.next.prev = n.prev
}
}
-
+ // Link to new parent.
+ // If parent=nil, remove n from the tree.
+ // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
n.parent = parent
if parent == nil {
n.next = nil
return false
}
+ // Don't consider the root "open" when updating openParent since
+ // we can't send data frames on the root stream (only control frames).
if n.id != 0 {
openParent = openParent || (n.state == http2priorityNodeOpen)
}
+ // Common case: only one kid or all kids have the same weight.
+ // Some clients don't use weights; other clients (like web browsers)
+ // use mostly-linear priority trees.
w := n.kids.weight
needSort := false
for k := n.kids.next; k != nil; k = k.next {
return false
}
+ // Uncommon case: sort the child nodes. We remove the kids from the parent,
+ // then re-insert after sorting so we can reuse tmp for future sort calls.
*tmp = (*tmp)[:0]
for n.kids != nil {
*tmp = append(*tmp, n.kids)
}
sort.Sort(http2sortPriorityNodeSiblings(*tmp))
for i := len(*tmp) - 1; i >= 0; i-- {
- (*tmp)[i].setParent(n)
+ (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
}
for k := n.kids; k != nil; k = k.next {
if k.walkReadyInOrder(openParent, tmp, f) {
func (z http2sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
func (z http2sortPriorityNodeSiblings) Less(i, k int) bool {
-
+ // Prefer the subtree that has sent fewer bytes relative to its weight.
+ // See sections 5.3.2 and 5.3.4.
wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
if bi == 0 && bk == 0 {
}
func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) {
-
+ // The stream may be currently idle but cannot be opened or closed.
if curr := ws.nodes[streamID]; curr != nil {
if curr.state != http2priorityNodeIdle {
panic(fmt.Sprintf("stream %d already opened", streamID))
return
}
+ // RFC 7540, Section 5.3.5:
+ // "All streams are initially assigned a non-exclusive dependency on stream 0x0.
+ // Pushed streams initially depend on their associated stream. In both cases,
+ // streams are assigned a default weight of 16."
parent := ws.nodes[options.PusherID]
if parent == nil {
parent = &ws.root
panic("adjustPriority on root")
}
+ // If streamID does not exist, there are two cases:
+ // - A closed stream that has been removed (this will have ID <= maxID)
+ // - An idle stream that is being used for "grouping" (this will have ID > maxID)
n := ws.nodes[streamID]
if n == nil {
if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
}
+ // Section 5.3.1: A dependency on a stream that is not currently in the tree
+ // results in that stream being given a default priority (Section 5.3.5).
parent := ws.nodes[priority.StreamDep]
if parent == nil {
n.setParent(&ws.root)
return
}
+ // Ignore if the client tries to make a node its own parent.
if n == parent {
return
}
+ // Section 5.3.3:
+ // "If a stream is made dependent on one of its own dependencies, the
+ // formerly dependent stream is first moved to be dependent on the
+ // reprioritized stream's previous parent. The moved dependency retains
+ // its weight."
+ //
+ // That is: if parent depends on n, move parent to depend on n.parent.
for x := parent.parent; x != nil; x = x.parent {
if x == n {
parent.setParent(n.parent)
}
}
+ // Section 5.3.3: The exclusive flag causes the stream to become the sole
+ // dependency of its parent stream, causing other dependencies to become
+ // dependent on the exclusive stream.
if priority.Exclusive {
k := parent.kids
for k != nil {
} else {
n = ws.nodes[id]
if n == nil {
-
+ // id is an idle or closed stream. wr should not be a HEADERS or
+ // DATA frame. However, wr can be a RST_STREAM. In this case, we
+ // push wr onto the root, rather than creating a new priorityNode,
+ // since RST_STREAM is tiny and the stream's priority is unknown
+ // anyway. See issue #17919.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
return false
}
n.addBytes(int64(wr.DataSize()))
-
+ // If B depends on A and B continuously has data available but A
+ // does not, gradually increase the throttling limit to allow B to
+ // steal more and more bandwidth from A.
if openParent {
ws.writeThrottleLimit += 1024
if ws.writeThrottleLimit < 0 {
return
}
if len(*list) == maxSize {
-
+ // Remove the oldest node, then shift left.
ws.removeNode((*list)[0])
x := (*list)[1:]
copy(*list, x)
}
func (ws *http2randomWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) {
-
+ // no-op: idle streams are not tracked
}
func (ws *http2randomWriteScheduler) CloseStream(streamID uint32) {
}
func (ws *http2randomWriteScheduler) AdjustStream(streamID uint32, priority http2PriorityParam) {
-
+ // no-op: priorities are ignored
}
func (ws *http2randomWriteScheduler) Push(wr http2FrameWriteRequest) {
}
func (ws *http2randomWriteScheduler) Pop() (http2FrameWriteRequest, bool) {
-
+ // Control frames first.
if !ws.zero.empty() {
return ws.zero.shift(), true
}
-
+ // Iterate over all non-idle streams until finding one that can be consumed.
for _, q := range ws.sq {
if wr, ok := q.consume(math.MaxInt32); ok {
return wr, true