From 504b8d15d656e1085dd0a741e294113dcc2bde9a Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 9 Jun 2017 02:49:07 +0000 Subject: [PATCH] net/http: regenerate http2 bundle with bundle fixes to include comments The golang.org/x/tools/cmd/bundle tool previously had a bug where it dropped some comments. This regenerates it with the fixed version (https://golang.org/cl/45117). (Upstream is still git rev 3470a06c1, from https://golang.org/cl/44331) Updates #20548 Change-Id: Ic5d9208a0c8f7facdb7b315c6acab66ace34c0a9 Reviewed-on: https://go-review.googlesource.com/45158 Reviewed-by: Hiroshi Ioka Reviewed-by: Brad Fitzpatrick --- src/net/http/h2_bundle.go | 949 ++++++++++++++++++++++++++++---------- 1 file changed, 711 insertions(+), 238 deletions(-) diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index bec9b0c467..12a25afb05 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -759,7 +759,7 @@ type http2dialCall struct { // requires p.mu is held. func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall { if call, ok := p.dialing[addr]; ok { - + // A dial is already in-flight. Don't start another. return call } call := &http2dialCall{p: p, done: make(chan struct{})} @@ -887,7 +887,12 @@ func (p *http2clientConnPool) MarkDead(cc *http2ClientConn) { func (p *http2clientConnPool) closeIdleConnections() { p.mu.Lock() defer p.mu.Unlock() - + // TODO: don't close a cc if it was just added to the pool + // milliseconds ago and has never been used. There's currently + // a small race window with the HTTP/1 Transport's integration + // where it can add an idle conn just before using it, and + // somebody else can concurrently call CloseIdleConns and + // break some caller's RoundTrip. for _, vv := range p.conns { for _, cc := range vv { cc.closeIfIdle() @@ -902,7 +907,8 @@ func http2filterOutClientConn(in []*http2ClientConn, exclude *http2ClientConn) [ out = append(out, v) } } - + // If we filtered it out, zero out the last item to prevent + // the GC from seeing it. if len(in) != len(out) { in[len(in)-1] = nil } @@ -943,7 +949,10 @@ func http2configureTransport(t1 *Transport) (*http2Transport, error) { go c.Close() return http2erringRoundTripper{err} } else if !used { - + // Turns out we don't need this c. + // For example, two goroutines made requests to the same host + // at the same time, both kicking off TCP dials. (since protocol + // was unknown) go c.Close() } return t2 @@ -1058,7 +1067,7 @@ func (b *http2dataBuffer) Read(p []byte) (int, error) { ntotal += n b.r += n b.size -= n - + // If the first chunk has been consumed, advance to the next chunk. if b.r == len(b.chunks[0]) { http2putDataBufferChunk(b.chunks[0]) end := len(b.chunks) - 1 @@ -1087,7 +1096,9 @@ func (b *http2dataBuffer) Len() int { func (b *http2dataBuffer) Write(p []byte) (int, error) { ntotal := len(p) for len(p) > 0 { - + // If the last chunk is empty, allocate a new chunk. Try to allocate + // enough to fully copy p plus any additional bytes we expect to + // receive. However, this may allocate less than len(p). want := int64(len(p)) if b.expected > want { want = b.expected @@ -1571,6 +1582,12 @@ type http2Framer struct { // If the limit is hit, MetaHeadersFrame.Truncated is set true. MaxHeaderListSize uint32 + // TODO: track which type of frame & with which flags was sent + // last. Then return an error (unless AllowIllegalWrites) if + // we're in the middle of a header block and a + // non-Continuation or Continuation on a different stream is + // attempted to be written. + logReads, logWrites bool debugFramer *http2Framer // only use for logging written writes @@ -1583,15 +1600,15 @@ type http2Framer struct { func (fr *http2Framer) maxHeaderListSize() uint32 { if fr.MaxHeaderListSize == 0 { - return 16 << 20 + return 16 << 20 // sane default, per docs } return fr.MaxHeaderListSize } func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamID uint32) { - + // Write the FrameHeader. f.wbuf = append(f.wbuf[:0], - 0, + 0, // 3 bytes of length, filled in in endWrite 0, 0, byte(ftype), @@ -1603,7 +1620,8 @@ func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamI } func (f *http2Framer) endWrite() error { - + // Now that we know the final size, fill in the FrameHeader in + // the space previously reserved for it. Abuse append. length := len(f.wbuf) - http2frameHeaderLen if length >= (1 << 24) { return http2ErrFrameTooLarge @@ -1627,8 +1645,9 @@ func (f *http2Framer) logWrite() { if f.debugFramer == nil { f.debugFramerBuf = new(bytes.Buffer) f.debugFramer = http2NewFramer(nil, f.debugFramerBuf) - f.debugFramer.logReads = false - + f.debugFramer.logReads = false // we log it ourselves, saying "wrote" below + // Let us read anything, even if we accidentally wrote it + // in the wrong order: f.debugFramer.AllowIllegalReads = true } f.debugFramerBuf.Write(f.wbuf) @@ -1845,7 +1864,11 @@ func (f *http2DataFrame) Data() []byte { func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { if fh.StreamID == 0 { - + // DATA frames MUST be associated with a stream. If a + // DATA frame is received whose stream identifier + // field is 0x0, the recipient MUST respond with a + // connection error (Section 5.4.1) of type + // PROTOCOL_ERROR. return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"} } f := fc.getDataFrame() @@ -1860,7 +1883,10 @@ func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byt } } if int(padSize) > len(payload) { - + // If the length of the padding is greater than the + // length of the frame payload, the recipient MUST + // treat this as a connection error. + // Filed: https://github.com/http2/http2-spec/issues/610 return nil, http2connError{http2ErrCodeProtocol, "pad size larger than data payload"} } f.data = payload[:len(payload)-int(padSize)] @@ -1911,7 +1937,7 @@ func (f *http2Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad if !f.AllowIllegalWrites { for _, b := range pad { if b != 0 { - + // "Padding octets MUST be set to zero when sending." return http2errPadBytes } } @@ -1945,20 +1971,33 @@ type http2SettingsFrame struct { func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 { - + // When this (ACK 0x1) bit is set, the payload of the + // SETTINGS frame MUST be empty. Receipt of a + // SETTINGS frame with the ACK flag set and a length + // field value other than 0 MUST be treated as a + // connection error (Section 5.4.1) of type + // FRAME_SIZE_ERROR. return nil, http2ConnectionError(http2ErrCodeFrameSize) } if fh.StreamID != 0 { - + // SETTINGS frames always apply to a connection, + // never a single stream. The stream identifier for a + // SETTINGS frame MUST be zero (0x0). If an endpoint + // receives a SETTINGS frame whose stream identifier + // field is anything other than 0x0, the endpoint MUST + // respond with a connection error (Section 5.4.1) of + // type PROTOCOL_ERROR. return nil, http2ConnectionError(http2ErrCodeProtocol) } if len(p)%6 != 0 { - + // Expecting even number of 6 byte settings. return nil, http2ConnectionError(http2ErrCodeFrameSize) } f := &http2SettingsFrame{http2FrameHeader: fh, p: p} if v, ok := f.Value(http2SettingInitialWindowSize); ok && v > (1<<31)-1 { - + // Values above the maximum flow control window size of 2^31 - 1 MUST + // be treated as a connection error (Section 5.4.1) of type + // FLOW_CONTROL_ERROR. return nil, http2ConnectionError(http2ErrCodeFlowControl) } return f, nil @@ -2127,9 +2166,14 @@ func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []by if len(p) != 4 { return nil, http2ConnectionError(http2ErrCodeFrameSize) } - inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff + inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff // mask off high reserved bit if inc == 0 { - + // A receiver MUST treat the receipt of a + // WINDOW_UPDATE frame with an flow control window + // increment of 0 as a stream error (Section 5.4.2) of + // type PROTOCOL_ERROR; errors on the connection flow + // control window MUST be treated as a connection + // error (Section 5.4.1). if fh.StreamID == 0 { return nil, http2ConnectionError(http2ErrCodeProtocol) } @@ -2146,7 +2190,7 @@ func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []by // If the Stream ID is zero, the window update applies to the // connection as a whole. func (f *http2Framer) WriteWindowUpdate(streamID, incr uint32) error { - + // "The legal range for the increment to the flow control window is 1 to 2^31-1 (2,147,483,647) octets." if (incr < 1 || incr > 2147483647) && !f.AllowIllegalWrites { return errors.New("illegal window increment value") } @@ -2188,7 +2232,10 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) ( http2FrameHeader: fh, } if fh.StreamID == 0 { - + // HEADERS frames MUST be associated with a stream. If a HEADERS frame + // is received whose stream identifier field is 0x0, the recipient MUST + // respond with a connection error (Section 5.4.1) of type + // PROTOCOL_ERROR. return nil, http2connError{http2ErrCodeProtocol, "HEADERS frame with stream ID 0"} } var padLength uint8 @@ -2204,7 +2251,7 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) ( return nil, err } hf.Priority.StreamDep = v & 0x7fffffff - hf.Priority.Exclusive = (v != hf.Priority.StreamDep) + hf.Priority.Exclusive = (v != hf.Priority.StreamDep) // high bit was set p, hf.Priority.Weight, err = http2readByte(p) if err != nil { return nil, err @@ -2325,13 +2372,13 @@ func http2parsePriorityFrame(_ *http2frameCache, fh http2FrameHeader, payload [] return nil, http2connError{http2ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5", len(payload))} } v := binary.BigEndian.Uint32(payload[:4]) - streamID := v & 0x7fffffff + streamID := v & 0x7fffffff // mask off high bit return &http2PriorityFrame{ http2FrameHeader: fh, http2PriorityParam: http2PriorityParam{ Weight: payload[4], StreamDep: streamID, - Exclusive: streamID != v, + Exclusive: streamID != v, // was high bit set? }, }, nil } @@ -2449,7 +2496,12 @@ func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2FrameHeader: fh, } if pp.StreamID == 0 { - + // PUSH_PROMISE frames MUST be associated with an existing, + // peer-initiated stream. The stream identifier of a + // PUSH_PROMISE frame indicates the stream it is associated + // with. If the stream identifier field specifies the value + // 0x0, a recipient MUST respond with a connection error + // (Section 5.4.1) of type PROTOCOL_ERROR. return nil, http2ConnectionError(http2ErrCodeProtocol) } // The PUSH_PROMISE frame includes optional padding. @@ -2468,7 +2520,7 @@ func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ pp.PromiseID = pp.PromiseID & (1<<31 - 1) if int(padLength) > len(p) { - + // like the DATA frame, error out if padding is longer than the body. return nil, http2ConnectionError(http2ErrCodeProtocol) } pp.headerFragBuf = p[:len(p)-int(padLength)] @@ -2638,7 +2690,9 @@ func (mh *http2MetaHeadersFrame) checkPseudos() error { default: return http2pseudoHeaderError(hf.Name) } - + // Check for duplicates. + // This would be a bad algorithm, but N is 4. + // And this doesn't allocate. for _, hf2 := range pf[:i] { if hf.Name == hf2.Name { return http2duplicatePseudoHeaderError(hf.Name) @@ -2656,7 +2710,8 @@ func (fr *http2Framer) maxHeaderStringLen() int { if uint32(int(v)) == v { return int(v) } - + // They had a crazy big number for MaxHeaderBytes anyway, + // so give them unlimited header lengths: return 0 } @@ -2711,7 +2766,7 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr mh.Fields = append(mh.Fields, hf) }) - + // Lose reference to MetaHeadersFrame: defer hdec.SetEmitFunc(func(hf hpack.HeaderField) {}) var hc http2headersOrContinuation = hf @@ -2727,7 +2782,7 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr if f, err := fr.ReadFrame(); err != nil { return nil, err } else { - hc = f.(*http2ContinuationFrame) + hc = f.(*http2ContinuationFrame) // guaranteed by checkFrameOrder } } @@ -2769,7 +2824,7 @@ func http2summarizeFrame(f http2Frame) string { return nil }) if n > 0 { - buf.Truncate(buf.Len() - 1) + buf.Truncate(buf.Len() - 1) // remove trailing comma } case *http2DataFrame: data := f.Data() @@ -2894,7 +2949,7 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error { func http2cloneTLSConfig(c *tls.Config) *tls.Config { c2 := c.Clone() - c2.GetClientCertificate = c.GetClientCertificate + c2.GetClientCertificate = c.GetClientCertificate // golang.org/issue/19264 return c2 } @@ -2974,7 +3029,7 @@ func http2curGoroutineID() uint64 { defer http2littleBuf.Put(bp) b := *bp b = b[:runtime.Stack(b, false)] - + // Parse the 4707 out of "goroutine 4707 [" b = bytes.TrimPrefix(b, http2goroutineSpace) i := bytes.IndexByte(b, ' ') if i < 0 { @@ -3010,9 +3065,10 @@ func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error) goto Error case 2 <= base && base <= 36: + // valid base; nothing to do case base == 0: - + // Look for octal, hex prefix. switch { case s[0] == '0' && len(s) > 1 && (s[1] == 'x' || s[1] == 'X'): base = 16 @@ -3058,7 +3114,7 @@ func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error) } if n >= cutoff { - + // n*base overflows n = 1<<64 - 1 err = strconv.ErrRange goto Error @@ -3067,7 +3123,7 @@ func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error) n1 := n + uint64(v) if n1 < n || n1 > maxVal { - + // n+v overflows n = 1<<64 - 1 err = strconv.ErrRange goto Error @@ -3251,7 +3307,7 @@ func (s http2Setting) String() string { // Valid reports whether the setting is valid. func (s http2Setting) Valid() error { - + // Limits and error codes from 6.5.2 Defined SETTINGS Parameters switch s.ID { case http2SettingEnablePush: if s.Val != 1 && s.Val != 0 { @@ -3495,7 +3551,8 @@ func (s *http2sorter) Keys(h Header) []string { } func (s *http2sorter) SortStrings(ss []string) { - + // Our sorter works on s.v, which sorter owns, so + // stash it away while we sort the user's buffer. save := s.v s.v = ss sort.Sort(s) @@ -3560,8 +3617,8 @@ func (p *http2pipe) Read(d []byte) (n int, err error) { } if p.err != nil { if p.readFn != nil { - p.readFn() - p.readFn = nil + p.readFn() // e.g. copy trailers + p.readFn = nil // not sticky like p.err } p.b = nil return 0, p.err @@ -3585,7 +3642,7 @@ func (p *http2pipe) Write(d []byte) (n int, err error) { return 0, http2errClosedPipeWrite } if p.breakErr != nil { - return len(d), nil + return len(d), nil // discard when there is no reader } return p.b.Write(d) } @@ -3617,7 +3674,7 @@ func (p *http2pipe) closeWithError(dst *error, err error, fn func()) { } defer p.c.Signal() if *dst != nil { - + // Already been done. return } p.readFn = fn @@ -3633,7 +3690,8 @@ func (p *http2pipe) closeDoneLocked() { if p.donec == nil { return } - + // Close if unclosed. This isn't racy since we always + // hold p.mu while closing. select { case <-p.donec: default: @@ -3659,7 +3717,7 @@ func (p *http2pipe) Done() <-chan struct{} { if p.donec == nil { p.donec = make(chan struct{}) if p.err != nil || p.breakErr != nil { - + // Already hit an error. p.closeDoneLocked() } } @@ -3785,7 +3843,7 @@ type http2serverInternalState struct { func (s *http2serverInternalState) registerConn(sc *http2serverConn) { if s == nil { - return + return // if the Server was used without calling ConfigureServer } s.mu.Lock() s.activeConns[sc] = struct{}{} @@ -3794,7 +3852,7 @@ func (s *http2serverInternalState) registerConn(sc *http2serverConn) { func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) { if s == nil { - return + return // if the Server was used without calling ConfigureServer } s.mu.Lock() delete(s.activeConns, sc) @@ -3803,7 +3861,7 @@ func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) { func (s *http2serverInternalState) startGracefulShutdown() { if s == nil { - return + return // if the Server was used without calling ConfigureServer } s.mu.Lock() for sc := range s.activeConns { @@ -3856,6 +3914,13 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { } } + // Note: not setting MinVersion to tls.VersionTLS12, + // as we don't want to interfere with HTTP/1.1 traffic + // on the user's server. We enforce TLS 1.2 later once + // we accept a connection. Ideally this should be done + // during next-proto selection, but using TLS <1.2 with + // HTTP/2 is still the client's bug. + s.TLSConfig.PreferServerCipherSuites = true haveNPN := false @@ -3946,10 +4011,10 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { readFrameCh: make(chan http2readFrameResult), wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), serveMsgCh: make(chan interface{}, 8), - wroteFrameCh: make(chan http2frameWriteResult, 1), - bodyReadCh: make(chan http2bodyReadMsg), + wroteFrameCh: make(chan http2frameWriteResult, 1), // buffered; one send in writeFrameAsync + bodyReadCh: make(chan http2bodyReadMsg), // buffering doesn't matter either way doneServing: make(chan struct{}), - clientMaxStreams: math.MaxUint32, + clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" advMaxStreams: s.maxConcurrentStreams(), initialStreamSendWindowSize: http2initialWindowSize, maxFrameSize: http2initialMaxFrameSize, @@ -3961,6 +4026,11 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { s.state.registerConn(sc) defer s.state.unregisterConn(sc) + // The net/http package sets the write deadline from the + // http.Server.WriteTimeout during the TLS handshake, but then + // passes the connection off to us with the deadline already set. + // Write deadlines are set per stream in serverConn.newStream. + // Disarm the net.Conn write deadline here. if sc.hs.WriteTimeout != 0 { sc.conn.SetWriteDeadline(time.Time{}) } @@ -3971,6 +4041,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { sc.writeSched = http2NewRandomWriteScheduler() } + // These start at the RFC-specified defaults. If there is a higher + // configured value for inflow, that will be updated when we send a + // WINDOW_UPDATE shortly after sending SETTINGS. sc.flow.add(http2initialWindowSize) sc.inflow.add(http2initialWindowSize) sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) @@ -3984,18 +4057,44 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { if tc, ok := c.(http2connectionStater); ok { sc.tlsState = new(tls.ConnectionState) *sc.tlsState = tc.ConnectionState() - + // 9.2 Use of TLS Features + // An implementation of HTTP/2 over TLS MUST use TLS + // 1.2 or higher with the restrictions on feature set + // and cipher suite described in this section. Due to + // implementation limitations, it might not be + // possible to fail TLS negotiation. An endpoint MUST + // immediately terminate an HTTP/2 connection that + // does not meet the TLS requirements described in + // this section with a connection error (Section + // 5.4.1) of type INADEQUATE_SECURITY. if sc.tlsState.Version < tls.VersionTLS12 { sc.rejectConn(http2ErrCodeInadequateSecurity, "TLS version too low") return } if sc.tlsState.ServerName == "" { - + // Client must use SNI, but we don't enforce that anymore, + // since it was causing problems when connecting to bare IP + // addresses during development. + // + // TODO: optionally enforce? Or enforce at the time we receive + // a new request, and verify the the ServerName matches the :authority? + // But that precludes proxy situations, perhaps. + // + // So for now, do nothing here again. } if !s.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) { - + // "Endpoints MAY choose to generate a connection error + // (Section 5.4.1) of type INADEQUATE_SECURITY if one of + // the prohibited cipher suites are negotiated." + // + // We choose that. In my opinion, the spec is weak + // here. It also says both parties must support at least + // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no + // excuses here. If we really must, we could allow an + // "AllowInsecureWeakCiphers" option on the server later. + // Let's see how it plays out first. sc.rejectConn(http2ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) return } @@ -4009,7 +4108,7 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { func (sc *http2serverConn) rejectConn(err http2ErrCode, debug string) { sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) - + // ignoring errors. hanging up anyway. sc.framer.WriteGoAway(0, err, []byte(debug)) sc.bw.Flush() sc.conn.Close() @@ -4135,11 +4234,16 @@ func (sc *http2serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { func (sc *http2serverConn) state(streamID uint32) (http2streamState, *http2stream) { sc.serveG.check() - + // http://tools.ietf.org/html/rfc7540#section-5.1 if st, ok := sc.streams[streamID]; ok { return st.state, st } - + // "The first use of a new stream identifier implicitly closes all + // streams in the "idle" state that might have been initiated by + // that peer with a lower-valued stream identifier. For example, if + // a client sends a HEADERS frame on stream 7 without ever sending a + // frame on stream 5, then stream 5 transitions to the "closed" + // state when the first frame for stream 7 is sent or received." if streamID%2 == 1 { if streamID <= sc.maxClientStreamID { return http2stateClosed, nil @@ -4193,11 +4297,18 @@ func http2isClosedConnError(err error) bool { return false } + // TODO: remove this string search and be more like the Windows + // case below. That might involve modifying the standard library + // to return better error types. str := err.Error() if strings.Contains(str, "use of closed network connection") { return true } + // TODO(bradfitz): x/tools/cmd/bundle doesn't really support + // build tags, so I can't make an http2_windows.go file with + // Windows-specific stuff. Fix that and move this, once we + // have a way to bundle this into std's net/http somehow. if runtime.GOOS == "windows" { if oe, ok := err.(*net.OpError); ok && oe.Op == "read" { if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" { @@ -4217,7 +4328,7 @@ func (sc *http2serverConn) condlogf(err error, format string, args ...interface{ return } if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) { - + // Boring, expected errors. sc.vlogf(format, args...) } else { sc.logf(format, args...) @@ -4307,7 +4418,7 @@ func (sc *http2serverConn) stopShutdownTimer() { } func (sc *http2serverConn) notePanic() { - + // Note: this is for serverConn.serve panicking, not http.Handler code. if http2testHookOnPanicMu != nil { http2testHookOnPanicMu.Lock() defer http2testHookOnPanicMu.Unlock() @@ -4327,7 +4438,7 @@ func (sc *http2serverConn) serve() { defer sc.conn.Close() defer sc.closeAllStreamsOnConnClose() defer sc.stopShutdownTimer() - defer close(sc.doneServing) + defer close(sc.doneServing) // unblocks handlers trying to send if http2VerboseLogs { sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) @@ -4343,6 +4454,8 @@ func (sc *http2serverConn) serve() { }) sc.unackedSettings++ + // Each connection starts with intialWindowSize inflow tokens. + // If a higher value is configured, we add more tokens. if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 { sc.sendWindowUpdate(nil, int(diff)) } @@ -4351,7 +4464,10 @@ func (sc *http2serverConn) serve() { sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) return } - + // Now that we've got the preface, get us out of the + // "StateNew" state. We can't go directly to idle, though. + // Active means we read some data and anticipate a request. We'll + // do another Active when we get a HEADERS frame. sc.setConnState(StateActive) sc.setConnState(StateIdle) @@ -4360,7 +4476,7 @@ func (sc *http2serverConn) serve() { defer sc.idleTimer.Stop() } - go sc.readFrames() + go sc.readFrames() // closed by defer sc.conn.Close above settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer) defer settingsTimer.Stop() @@ -4391,7 +4507,7 @@ func (sc *http2serverConn) serve() { case msg := <-sc.serveMsgCh: switch v := msg.(type) { case func(int): - v(loopNum) + v(loopNum) // for testing case *http2serverMessage: switch v { case http2settingsTimerMsg: @@ -4446,7 +4562,7 @@ func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) } func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) } func (sc *http2serverConn) sendServeMsg(msg interface{}) { - sc.serveG.checkNotOn() + sc.serveG.checkNotOn() // NOT select { case sc.serveMsgCh <- msg: case <-sc.doneServing: @@ -4458,7 +4574,7 @@ func (sc *http2serverConn) sendServeMsg(msg interface{}) { func (sc *http2serverConn) readPreface() error { errc := make(chan error, 1) go func() { - + // Read the client preface buf := make([]byte, len(http2ClientPreface)) if _, err := io.ReadFull(sc.conn, buf); err != nil { errc <- err @@ -4468,7 +4584,7 @@ func (sc *http2serverConn) readPreface() error { errc <- nil } }() - timer := time.NewTimer(http2prefaceTimeout) + timer := time.NewTimer(http2prefaceTimeout) // TODO: configurable on *Server? defer timer.Stop() select { case <-timer.C: @@ -4512,7 +4628,13 @@ func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, data []byte case <-sc.doneServing: return http2errClientDisconnected case <-stream.cw: - + // If both ch and stream.cw were ready (as might + // happen on the final Write after an http.Handler + // ends), prefer the write result. Otherwise this + // might just be us successfully closing the stream. + // The writeFrameAsync and serve goroutines guarantee + // that the ch send will happen before the stream.cw + // close. select { case err = <-ch: frameWriteDone = true @@ -4535,12 +4657,13 @@ func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, data []byte // buffered and is read by serve itself). If you're on the serve // goroutine, call writeFrame instead. func (sc *http2serverConn) writeFrameFromHandler(wr http2FrameWriteRequest) error { - sc.serveG.checkNotOn() + sc.serveG.checkNotOn() // NOT select { case sc.wantWriteFrameCh <- wr: return nil case <-sc.doneServing: - + // Serve loop is gone. + // Client has closed their connection to the server. return http2errClientDisconnected } } @@ -4559,6 +4682,24 @@ func (sc *http2serverConn) writeFrame(wr http2FrameWriteRequest) { // If true, wr will not be written and wr.done will not be signaled. var ignoreWrite bool + // We are not allowed to write frames on closed streams. RFC 7540 Section + // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on + // a closed stream." Our server never sends PRIORITY, so that exception + // does not apply. + // + // The serverConn might close an open stream while the stream's handler + // is still running. For example, the server might close a stream when it + // receives bad data from the client. If this happens, the handler might + // attempt to write a frame after the stream has been closed (since the + // handler hasn't yet been notified of the close). In this case, we simply + // ignore the frame. The handler will notice that the stream is closed when + // it waits for the frame to be written. + // + // As an exception to this rule, we allow sending RST_STREAM after close. + // This allows us to immediately reject new streams without tracking any + // state for those streams (except for the queued RST_STREAM frame). This + // may result in duplicate RST_STREAMs in some cases, but the client should + // ignore those. if wr.StreamID() != 0 { _, isReset := wr.write.(http2StreamError) if state, _ := sc.state(wr.StreamID()); state == http2stateClosed && !isReset { @@ -4566,12 +4707,15 @@ func (sc *http2serverConn) writeFrame(wr http2FrameWriteRequest) { } } + // Don't send a 100-continue response if we've already sent headers. + // See golang.org/issue/14030. switch wr.write.(type) { case *http2writeResHeaders: wr.stream.wroteHeaders = true case http2write100ContinueHeadersFrame: if wr.stream.wroteHeaders { - + // We do not need to notify wr.done because this frame is + // never written with wr.done != nil. if wr.done != nil { panic("wr.done != nil for write100ContinueHeadersFrame") } @@ -4600,7 +4744,8 @@ func (sc *http2serverConn) startFrameWrite(wr http2FrameWriteRequest) { case http2stateHalfClosedLocal: switch wr.write.(type) { case http2StreamError, http2handlerPanicRST, http2writeWindowUpdate: - + // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE + // in this state. (We never send PRIORITY from the server, so that is not checked.) default: panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) } @@ -4654,9 +4799,21 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { } switch st.state { case http2stateOpen: - + // Here we would go to stateHalfClosedLocal in + // theory, but since our handler is done and + // the net/http package provides no mechanism + // for closing a ResponseWriter while still + // reading data (see possible TODO at top of + // this file), we go into closed state here + // anyway, after telling the peer we're + // hanging up on them. We'll transition to + // stateClosed after the RST_STREAM frame is + // written. st.state = http2stateHalfClosedLocal - + // Section 8.1: a server MAY request that the client abort + // transmission of a request without error by sending a + // RST_STREAM with an error code of NO_ERROR after sending + // a complete response. sc.resetStream(http2streamError(st.id, http2ErrCodeNo)) case http2stateHalfClosedRemote: sc.closeStream(st, http2errHandlerComplete) @@ -4664,7 +4821,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { } else { switch v := wr.write.(type) { case http2StreamError: - + // st may be unknown if the RST_STREAM was generated to reject bad input. if st, ok := sc.streams[v.StreamID]; ok { sc.closeStream(st, v) } @@ -4673,6 +4830,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { } } + // Reply (if requested) to unblock the ServeHTTP goroutine. wr.replyToWriter(res.err) sc.scheduleFrameWrite() @@ -4720,7 +4878,7 @@ func (sc *http2serverConn) scheduleFrameWrite() { } if sc.needsFrameFlush { sc.startFrameWrite(http2FrameWriteRequest{write: http2flushFrameWriter{}}) - sc.needsFrameFlush = false + sc.needsFrameFlush = false // after startFrameWrite, since it sets this true continue } break @@ -4736,7 +4894,7 @@ func (sc *http2serverConn) scheduleFrameWrite() { // startGracefulShutdown returns immediately; it does not wait until // the connection has shut down. func (sc *http2serverConn) startGracefulShutdown() { - sc.serveG.checkNotOn() + sc.serveG.checkNotOn() // NOT sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) }) } @@ -4750,7 +4908,7 @@ func (sc *http2serverConn) goAway(code http2ErrCode) { if code != http2ErrCodeNo { forceCloseIn = 250 * time.Millisecond } else { - + // TODO: configurable forceCloseIn = 1 * time.Second } sc.goAwayIn(code, forceCloseIn) @@ -4792,11 +4950,18 @@ func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool if err != nil { if err == http2ErrFrameTooLarge { sc.goAway(http2ErrCodeFrameSize) - return true + return true // goAway will close the loop } clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) if clientGone { - + // TODO: could we also get into this state if + // the peer does a half close + // (e.g. CloseWrite) because they're done + // sending frames but they're still wanting + // our open replies? Investigate. + // TODO: add CloseWrite to crypto/tls.Conn first + // so we have a way to test this? I suppose + // just for testing we could have a non-TLS mode. return false } } else { @@ -4820,7 +4985,7 @@ func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool case http2ConnectionError: sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev) sc.goAway(http2ErrCode(ev)) - return true + return true // goAway will handle shutdown default: if res.err != nil { sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) @@ -4834,6 +4999,7 @@ func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool func (sc *http2serverConn) processFrame(f http2Frame) error { sc.serveG.check() + // First frame received must be SETTINGS. if !sc.sawFirstSettings { if _, ok := f.(*http2SettingsFrame); !ok { return http2ConnectionError(http2ErrCodeProtocol) @@ -4859,7 +5025,8 @@ func (sc *http2serverConn) processFrame(f http2Frame) error { case *http2GoAwayFrame: return sc.processGoAway(f) case *http2PushPromiseFrame: - + // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE + // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. return http2ConnectionError(http2ErrCodeProtocol) default: sc.vlogf("http2: server ignoring frame: %v", f.Header()) @@ -4870,11 +5037,16 @@ func (sc *http2serverConn) processFrame(f http2Frame) error { func (sc *http2serverConn) processPing(f *http2PingFrame) error { sc.serveG.check() if f.IsAck() { - + // 6.7 PING: " An endpoint MUST NOT respond to PING frames + // containing this flag." return nil } if f.StreamID != 0 { - + // "PING frames are not associated with any individual + // stream. If a PING frame is received with a stream + // identifier field value other than 0x0, the recipient MUST + // respond with a connection error (Section 5.4.1) of type + // PROTOCOL_ERROR." return http2ConnectionError(http2ErrCodeProtocol) } if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo { @@ -4887,20 +5059,27 @@ func (sc *http2serverConn) processPing(f *http2PingFrame) error { func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error { sc.serveG.check() switch { - case f.StreamID != 0: + case f.StreamID != 0: // stream-level flow control state, st := sc.state(f.StreamID) if state == http2stateIdle { - + // Section 5.1: "Receiving any frame other than HEADERS + // or PRIORITY on a stream in this state MUST be + // treated as a connection error (Section 5.4.1) of + // type PROTOCOL_ERROR." return http2ConnectionError(http2ErrCodeProtocol) } if st == nil { - + // "WINDOW_UPDATE can be sent by a peer that has sent a + // frame bearing the END_STREAM flag. This means that a + // receiver could receive a WINDOW_UPDATE frame on a "half + // closed (remote)" or "closed" stream. A receiver MUST + // NOT treat this as an error, see Section 5.1." return nil } if !st.flow.add(int32(f.Increment)) { return http2streamError(f.StreamID, http2ErrCodeFlowControl) } - default: + default: // connection-level flow control if !sc.flow.add(int32(f.Increment)) { return http2goAwayFlowError{} } @@ -4914,7 +5093,11 @@ func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error { state, st := sc.state(f.StreamID) if state == http2stateIdle { - + // 6.4 "RST_STREAM frames MUST NOT be sent for a + // stream in the "idle" state. If a RST_STREAM frame + // identifying an idle stream is received, the + // recipient MUST treat this as a connection error + // (Section 5.4.1) of type PROTOCOL_ERROR. return http2ConnectionError(http2ErrCodeProtocol) } if st != nil { @@ -4949,12 +5132,13 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) { } } if p := st.body; p != nil { - + // Return any buffered unread bytes worth of conn-level flow control. + // See golang.org/issue/16481 sc.sendWindowUpdate(nil, p.Len()) p.CloseWithError(err) } - st.cw.Close() + st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc sc.writeSched.CloseStream(st.id) } @@ -4963,7 +5147,9 @@ func (sc *http2serverConn) processSettings(f *http2SettingsFrame) error { if f.IsAck() { sc.unackedSettings-- if sc.unackedSettings < 0 { - + // Why is the peer ACKing settings we never sent? + // The spec doesn't mention this case, but + // hang up on them anyway. return http2ConnectionError(http2ErrCodeProtocol) } return nil @@ -4995,11 +5181,13 @@ func (sc *http2serverConn) processSetting(s http2Setting) error { case http2SettingInitialWindowSize: return sc.processSettingInitialWindowSize(s.Val) case http2SettingMaxFrameSize: - sc.maxFrameSize = int32(s.Val) + sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31 case http2SettingMaxHeaderListSize: sc.peerMaxHeaderListSize = s.Val default: - + // Unknown setting: "An endpoint that receives a SETTINGS + // frame with any unknown or unsupported identifier MUST + // ignore that setting." if http2VerboseLogs { sc.vlogf("http2: server ignoring unknown setting %v", s) } @@ -5009,13 +5197,26 @@ func (sc *http2serverConn) processSetting(s http2Setting) error { func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { sc.serveG.check() - + // Note: val already validated to be within range by + // processSetting's Valid call. + + // "A SETTINGS frame can alter the initial flow control window + // size for all current streams. When the value of + // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST + // adjust the size of all stream flow control windows that it + // maintains by the difference between the new value and the + // old value." old := sc.initialStreamSendWindowSize sc.initialStreamSendWindowSize = int32(val) - growth := int32(val) - old + growth := int32(val) - old // may be negative for _, st := range sc.streams { if !st.flow.add(growth) { - + // 6.9.2 Initial Flow Control Window Size + // "An endpoint MUST treat a change to + // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow + // control window to exceed the maximum size as a + // connection error (Section 5.4.1) of type + // FLOW_CONTROL_ERROR." return http2ConnectionError(http2ErrCodeFlowControl) } } @@ -5029,23 +5230,40 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { } data := f.Data() + // "If a DATA frame is received whose stream is not in "open" + // or "half closed (local)" state, the recipient MUST respond + // with a stream error (Section 5.4.2) of type STREAM_CLOSED." id := f.Header().StreamID state, st := sc.state(id) if id == 0 || state == http2stateIdle { - + // Section 5.1: "Receiving any frame other than HEADERS + // or PRIORITY on a stream in this state MUST be + // treated as a connection error (Section 5.4.1) of + // type PROTOCOL_ERROR." return http2ConnectionError(http2ErrCodeProtocol) } if st == nil || state != http2stateOpen || st.gotTrailerHeader || st.resetQueued { - + // This includes sending a RST_STREAM if the stream is + // in stateHalfClosedLocal (which currently means that + // the http.Handler returned, so it's done reading & + // done writing). Try to stop the client from sending + // more DATA. + + // But still enforce their connection-level flow control, + // and return any flow control bytes since we're not going + // to consume them. if sc.inflow.available() < int32(f.Length) { return http2streamError(id, http2ErrCodeFlowControl) } - + // Deduct the flow control from inflow, since we're + // going to immediately add it back in + // sendWindowUpdate, which also schedules sending the + // frames. sc.inflow.take(int32(f.Length)) - sc.sendWindowUpdate(nil, int(f.Length)) + sc.sendWindowUpdate(nil, int(f.Length)) // conn-level if st != nil && st.resetQueued { - + // Already have a stream error in flight. Don't send another. return nil } return http2streamError(id, http2ErrCodeStreamClosed) @@ -5054,12 +5272,13 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { panic("internal error: should have a body in this state") } + // Sender sending more than they'd declared? if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) return http2streamError(id, http2ErrCodeStreamClosed) } if f.Length > 0 { - + // Check whether the client has flow control quota. if st.inflow.available() < int32(f.Length) { return http2streamError(id, http2ErrCodeFlowControl) } @@ -5076,6 +5295,8 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { st.bodyBytes += int64(len(data)) } + // Return any padded flow control now, since we won't + // refund it later on body reads. if pad := int32(f.Length) - int32(len(data)); pad > 0 { sc.sendWindowUpdate32(nil, pad) sc.sendWindowUpdate32(st, pad) @@ -5095,7 +5316,8 @@ func (sc *http2serverConn) processGoAway(f *http2GoAwayFrame) error { sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) } sc.startGracefulShutdownInternal() - + // http://tools.ietf.org/html/rfc7540#section-6.8 + // We should not create any new streams, which means we should disable push. sc.pushEnabled = false return nil } @@ -5126,7 +5348,7 @@ func (st *http2stream) endStream() { func (st *http2stream) copyTrailersToHandlerRequest() { for k, vv := range st.trailer { if _, ok := st.reqTrailer[k]; ok { - + // Only copy it over it was pre-declared. st.reqTrailer[k] = vv } } @@ -5142,22 +5364,35 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { sc.serveG.check() id := f.StreamID if sc.inGoAway { - + // Ignore. return nil } - + // http://tools.ietf.org/html/rfc7540#section-5.1.1 + // Streams initiated by a client MUST use odd-numbered stream + // identifiers. [...] An endpoint that receives an unexpected + // stream identifier MUST respond with a connection error + // (Section 5.4.1) of type PROTOCOL_ERROR. if id%2 != 1 { return http2ConnectionError(http2ErrCodeProtocol) } - + // A HEADERS frame can be used to create a new stream or + // send a trailer for an open one. If we already have a stream + // open, let it process its own HEADERS frame (trailers at this + // point, if it's valid). if st := sc.streams[f.StreamID]; st != nil { if st.resetQueued { - + // We're sending RST_STREAM to close the stream, so don't bother + // processing this frame. return nil } return st.processTrailerHeaders(f) } + // [...] The identifier of a newly established stream MUST be + // numerically greater than all streams that the initiating + // endpoint has opened or reserved. [...] An endpoint that + // receives an unexpected stream identifier MUST respond with + // a connection error (Section 5.4.1) of type PROTOCOL_ERROR. if id <= sc.maxClientStreamID { return http2ConnectionError(http2ErrCodeProtocol) } @@ -5167,12 +5402,22 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { sc.idleTimer.Stop() } + // http://tools.ietf.org/html/rfc7540#section-5.1.2 + // [...] Endpoints MUST NOT exceed the limit set by their peer. An + // endpoint that receives a HEADERS frame that causes their + // advertised concurrent stream limit to be exceeded MUST treat + // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR + // or REFUSED_STREAM. if sc.curClientStreams+1 > sc.advMaxStreams { if sc.unackedSettings == 0 { - + // They should know better. return http2streamError(id, http2ErrCodeProtocol) } - + // Assume it's a network race, where they just haven't + // received our last SETTINGS update. But actually + // this can't happen yet, because we don't yet provide + // a way for users to adjust server parameters at + // runtime. return http2streamError(id, http2ErrCodeRefusedStream) } @@ -5197,17 +5442,24 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { if st.reqTrailer != nil { st.trailer = make(Header) } - st.body = req.Body.(*http2requestBody).pipe + st.body = req.Body.(*http2requestBody).pipe // may be nil st.declBodyBytes = req.ContentLength handler := sc.handler.ServeHTTP if f.Truncated { - + // Their header list was too long. Send a 431 error. handler = http2handleHeaderListTooLong } else if err := http2checkValidHTTP2RequestHeaders(req.Header); err != nil { handler = http2new400Handler(err) } + // The net/http package sets the read deadline from the + // http.Server.ReadTimeout during the TLS handshake, but then + // passes the connection off to us with the deadline already + // set. Disarm it here after the request headers are read, + // similar to how the http1 server works. Here it's + // technically more like the http1 Server's ReadHeaderTimeout + // (in Go 1.8), though. That's a more sane option anyway. if sc.hs.ReadTimeout != 0 { sc.conn.SetReadDeadline(time.Time{}) } @@ -5234,7 +5486,9 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { for _, hf := range f.RegularFields() { key := sc.canonicalHeader(hf.Name) if !http2ValidTrailerHeader(key) { - + // TODO: send more details to the peer somehow. But http2 has + // no way to send debug data at a stream level. Discuss with + // HTTP folk. return http2streamError(st.id, http2ErrCodeProtocol) } st.trailer[key] = append(st.trailer[key], hf.Value) @@ -5246,7 +5500,10 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { func http2checkPriority(streamID uint32, p http2PriorityParam) error { if streamID == p.StreamDep { - + // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat + // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR." + // Section 5.3.3 says that a stream can depend on one of its dependencies, + // so it's only self-dependencies that are forbidden. return http2streamError(streamID, http2ErrCodeProtocol) } return nil @@ -5278,9 +5535,9 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState cancelCtx: cancelCtx, } st.cw.Init() - st.flow.conn = &sc.flow + st.flow.conn = &sc.flow // link to conn-level counter st.flow.add(sc.initialStreamSendWindowSize) - st.inflow.conn = &sc.inflow + st.inflow.conn = &sc.inflow // link to conn-level counter st.inflow.add(sc.srv.initialStreamRecvWindowSize()) if sc.hs.WriteTimeout != 0 { st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) @@ -5316,13 +5573,22 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { - + // See 8.1.2.6 Malformed Requests and Responses: + // + // Malformed requests or responses that are detected + // MUST be treated as a stream error (Section 5.4.2) + // of type PROTOCOL_ERROR." + // + // 8.1.2.3 Request Pseudo-Header Fields + // "All HTTP/2 requests MUST include exactly one valid + // value for the :method, :scheme, and :path + // pseudo-header fields" return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } bodyOpen := !f.StreamEnded() if rp.method == "HEAD" && bodyOpen { - + // HEAD requests can't have bodies return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } @@ -5369,7 +5635,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re if needsContinue { rp.header.Del("Expect") } - + // Merge Cookie headers into one "; "-delimited value. if cookies := rp.header["Cookie"]; len(cookies) > 1 { rp.header.Set("Cookie", strings.Join(cookies, "; ")) } @@ -5381,7 +5647,8 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re key = CanonicalHeaderKey(strings.TrimSpace(key)) switch key { case "Transfer-Encoding", "Trailer", "Content-Length": - + // Bogus. (copy of http1 rules) + // Ignore. default: if trailer == nil { trailer = make(Header) @@ -5396,7 +5663,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re var requestURI string if rp.method == "CONNECT" { url_ = &url.URL{Host: rp.authority} - requestURI = rp.authority + requestURI = rp.authority // mimic HTTP/1 server behavior } else { var err error url_, err = url.ParseRequestURI(rp.path) @@ -5429,7 +5696,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re rws := http2responseWriterStatePool.Get().(*http2responseWriterState) bwSave := rws.bw - *rws = http2responseWriterState{} + *rws = http2responseWriterState{} // zero all the fields rws.conn = sc rws.bw = bwSave rws.bw.Reset(http2chunkWriter{rws}) @@ -5452,7 +5719,7 @@ func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, han write: http2handlerPanicRST{rw.rws.stream.id}, stream: rw.rws.stream, }) - + // Same as net/http: if http2shouldLogPanic(e) { const size = 64 << 10 buf := make([]byte, size) @@ -5480,10 +5747,13 @@ func http2handleHeaderListTooLong(w ResponseWriter, r *Request) { // called from handler goroutines. // h may be nil. func (sc *http2serverConn) writeHeaders(st *http2stream, headerData *http2writeResHeaders) error { - sc.serveG.checkNotOn() + sc.serveG.checkNotOn() // NOT on var errc chan error if headerData.h != nil { - + // If there's a header map (which we don't own), so we have to block on + // waiting for this frame to be written, so an http.Flush mid-handler + // writes out the correct value of keys, before a handler later potentially + // mutates it. errc = http2errChanPool.Get().(chan error) } if err := sc.writeFrameFromHandler(http2FrameWriteRequest{ @@ -5526,7 +5796,7 @@ type http2bodyReadMsg struct { // Notes that the handler for the given stream ID read n bytes of its body // and schedules flow control tokens to be sent. func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err error) { - sc.serveG.checkNotOn() + sc.serveG.checkNotOn() // NOT on if n > 0 { select { case sc.bodyReadCh <- http2bodyReadMsg{st, n}: @@ -5537,9 +5807,10 @@ func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err e func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) { sc.serveG.check() - sc.sendWindowUpdate(nil, n) + sc.sendWindowUpdate(nil, n) // conn-level if st.state != http2stateHalfClosedRemote && st.state != http2stateClosed { - + // Don't send this WINDOW_UPDATE if the stream is closed + // remotely. sc.sendWindowUpdate(st, n) } } @@ -5681,7 +5952,7 @@ func (rws *http2responseWriterState) hasTrailers() bool { return len(rws.trailer func (rws *http2responseWriterState) declareTrailer(k string) { k = CanonicalHeaderKey(k) if !http2ValidTrailerHeader(k) { - + // Forbidden by RFC 2616 14.40. rws.conn.logf("ignoring invalid trailer %q", k) return } @@ -5723,7 +5994,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) { } var date string if _, ok := rws.snapHeader["Date"]; !ok { - + // TODO(bradfitz): be faster here, like net/http? measure. date = time.Now().UTC().Format(TimeFormat) } @@ -5761,7 +6032,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) { endStream := rws.handlerDone && !rws.hasTrailers() if len(p) > 0 || endStream { - + // only send a 0 byte DATA frame if we're ending the stream. if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { return 0, err } @@ -5838,11 +6109,14 @@ func (w *http2responseWriter) Flush() { } if rws.bw.Buffered() > 0 { if err := rws.bw.Flush(); err != nil { - + // Ignore the error. The frame writer already knows. return } } else { - + // The bufio.Writer won't call chunkWriter.Write + // (writeChunk with zero bytes, so we have to do it + // ourselves to force the HTTP response header and/or + // final DATA frame (with END_STREAM) to be sent. rws.writeChunk(nil) } } @@ -5859,7 +6133,7 @@ func (w *http2responseWriter) CloseNotify() <-chan bool { rws.closeNotifierCh = ch cw := rws.stream.cw go func() { - cw.Wait() + cw.Wait() // wait for close ch <- true }() } @@ -5934,9 +6208,9 @@ func (w *http2responseWriter) write(lenData int, dataB []byte, dataS string) (n if !http2bodyAllowedForStatus(rws.status) { return 0, ErrBodyNotAllowed } - rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) + rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen { - + // TODO: send a RST_STREAM return 0, errors.New("http2: handler wrote more than declared Content-Length") } @@ -5973,10 +6247,13 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { sc := st.sc sc.serveG.checkNotOn() + // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream." + // http://tools.ietf.org/html/rfc7540#section-6.6 if st.isPushed() { return http2ErrRecursivePush } + // Default options. if opts.Method == "" { opts.Method = "GET" } @@ -5988,6 +6265,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { wantScheme = "https" } + // Validate the request. u, err := url.Parse(target) if err != nil { return err @@ -6010,7 +6288,10 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { if strings.HasPrefix(k, ":") { return fmt.Errorf("promised request headers cannot include pseudo header %q", k) } - + // These headers are meaningful only if the request has a body, + // but PUSH_PROMISE requests cannot have a body. + // http://tools.ietf.org/html/rfc7540#section-8.2 + // Also disallow Host, since the promised URL must be absolute. switch strings.ToLower(k) { case "content-length", "content-encoding", "trailer", "te", "expect", "host": return fmt.Errorf("promised request headers cannot include %q", k) @@ -6020,6 +6301,9 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { return err } + // The RFC effectively limits promised requests to GET and HEAD: + // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]" + // http://tools.ietf.org/html/rfc7540#section-8.2 if opts.Method != "GET" && opts.Method != "HEAD" { return fmt.Errorf("method %q must be GET or HEAD", opts.Method) } @@ -6062,28 +6346,41 @@ type http2startPushRequest struct { func (sc *http2serverConn) startPush(msg *http2startPushRequest) { sc.serveG.check() + // http://tools.ietf.org/html/rfc7540#section-6.6. + // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that + // is in either the "open" or "half-closed (remote)" state. if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote { - + // responseWriter.Push checks that the stream is peer-initiaed. msg.done <- http2errStreamClosed return } + // http://tools.ietf.org/html/rfc7540#section-6.6. if !sc.pushEnabled { msg.done <- ErrNotSupported return } + // PUSH_PROMISE frames must be sent in increasing order by stream ID, so + // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE + // is written. Once the ID is allocated, we start the request handler. allocatePromisedID := func() (uint32, error) { sc.serveG.check() + // Check this again, just in case. Technically, we might have received + // an updated SETTINGS by the time we got around to writing this frame. if !sc.pushEnabled { return 0, ErrNotSupported } - + // http://tools.ietf.org/html/rfc7540#section-6.5.2. if sc.curPushedStreams+1 > sc.clientMaxStreams { return 0, http2ErrPushLimitReached } + // http://tools.ietf.org/html/rfc7540#section-5.1.1. + // Streams initiated by the server MUST use even-numbered identifiers. + // A server that is unable to establish a new stream identifier can send a GOAWAY + // frame so that the client is forced to open a new connection for new streams. if sc.maxPushPromiseID+2 >= 1<<31 { sc.startGracefulShutdownInternal() return 0, http2ErrPushLimitReached @@ -6091,16 +6388,21 @@ func (sc *http2serverConn) startPush(msg *http2startPushRequest) { sc.maxPushPromiseID += 2 promisedID := sc.maxPushPromiseID + // http://tools.ietf.org/html/rfc7540#section-8.2. + // Strictly speaking, the new stream should start in "reserved (local)", then + // transition to "half closed (remote)" after sending the initial HEADERS, but + // we start in "half closed (remote)" for simplicity. + // See further comments at the definition of stateHalfClosedRemote. promised := sc.newStream(promisedID, msg.parent.id, http2stateHalfClosedRemote) rw, req, err := sc.newWriterAndRequestNoBody(promised, http2requestParam{ method: msg.method, scheme: msg.url.Scheme, authority: msg.url.Host, path: msg.url.RequestURI(), - header: http2cloneHeader(msg.header), + header: http2cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE }) if err != nil { - + // Should not happen, since we've already validated msg.url. panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err)) } @@ -6310,7 +6612,7 @@ var http2errTransportVersion = errors.New("http2: ConfigureTransport is only sup // It requires Go 1.6 or later and returns an error if the net/http package is too old // or if t1 has already been HTTP/2-enabled. func http2ConfigureTransport(t1 *Transport) error { - _, err := http2configureTransport(t1) + _, err := http2configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go return err } @@ -6493,7 +6795,7 @@ func (t *http2Transport) RoundTrip(req *Request) (*Response, error) { // and returns a host:port. The port 443 is added if needed. func http2authorityAddr(scheme string, authority string) (addr string) { host, port, err := net.SplitHostPort(authority) - if err != nil { + if err != nil { // authority didn't have a port port = "443" if scheme == "http" { port = "80" @@ -6503,7 +6805,7 @@ func http2authorityAddr(scheme string, authority string) (addr string) { if a, err := idna.ToASCII(host); err == nil { host = a } - + // IPv6 address literal, without a port: if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { return host + ":" + port } @@ -6566,12 +6868,14 @@ func http2shouldRetryRequest(req *Request, err error) (*Request, error) { case http2errClientConnUnusable, http2errClientConnGotGoAway: return req, nil case http2errClientConnGotGoAwayAfterSomeReqBody: - + // If the Body is nil (or http.NoBody), it's safe to reuse + // this request and its Body. if req.Body == nil || http2reqBodyIsNoBody(req.Body) { return req, nil } - - getBody := http2reqGetBody(req) + // Otherwise we depend on the Request having its GetBody + // func defined. + getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody if getBody == nil { return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error") } @@ -6664,9 +6968,9 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client tconn: c, readerDone: make(chan struct{}), nextStreamID: 1, - maxFrameSize: 16 << 10, - initialWindowSize: 65535, - maxConcurrentStreams: 1000, + maxFrameSize: 16 << 10, // spec default + initialWindowSize: 65535, // spec default + maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. streams: make(map[uint32]*http2clientStream), singleUse: singleUse, wantSettingsAck: true, @@ -6683,12 +6987,16 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(http2initialWindowSize)) + // TODO: adjust this writer size to account for frame size + + // MTU + crypto/tls record padding. cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr}) cc.br = bufio.NewReader(c) cc.fr = http2NewFramer(cc.bw, cc.br) cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize() + // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on + // henc in response to SETTINGS frames? cc.henc = hpack.NewEncoder(&cc.hbuf) if cs, ok := c.(http2connectionStater); ok { @@ -6724,6 +7032,7 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { old := cc.goAway cc.goAway = f + // Merge the previous and current GoAway error frames. if cc.goAwayDebug == "" { cc.goAwayDebug = string(f.DebugData()) } @@ -6774,7 +7083,7 @@ func (cc *http2ClientConn) closeIfIdle() { } cc.closed = true nextID := cc.nextStreamID - + // TODO: do clients send GOAWAY too? maybe? Just Close: cc.mu.Unlock() if http2VerboseLogs { @@ -6820,7 +7129,7 @@ func (cc *http2ClientConn) putFrameScratchBuffer(buf []byte) { return } } - + // forget about it. } // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not @@ -6848,7 +7157,10 @@ func (cc *http2ClientConn) responseHeaderTimeout() time.Duration { if cc.t.t1 != nil { return cc.t.t1.ResponseHeaderTimeout } - + // No way to do this (yet?) with just an http2.Transport. Probably + // no need. Request.Cancel this is the new way. We only need to support + // this for compatibility with the old http.Transport fields when + // we're doing transparent http2. return 0 } @@ -6912,10 +7224,24 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && req.Method != "HEAD" { - + // Request gzip only, not deflate. Deflate is ambiguous and + // not as universally supported anyway. + // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 + // + // Note that we don't request this for HEAD requests, + // due to a bug in nginx: + // http://trac.nginx.org/nginx/ticket/358 + // https://golang.org/issue/5522 + // + // We don't request gzip if the request is for a range, since + // auto-decoding a portion of a gzipped document will just fail + // anyway. See https://golang.org/issue/8923 requestedGzip = true } + // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is + // sent by writeRequestBody below, along with any Trailers, + // again in form HEADERS{1}, CONTINUATION{0,}) hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) if err != nil { cc.mu.Unlock() @@ -6938,11 +7264,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if werr != nil { if hasBody { - req.Body.Close() + req.Body.Close() // per RoundTripper contract bodyWriter.cancel() } cc.forgetStreamID(cs.ID) - + // Don't bother sending a RST_STREAM (our write already failed; + // no need to keep writing) http2traceWroteRequest(cs.trace, werr) return nil, werr } @@ -6966,7 +7293,15 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { handleReadLoopResponse := func(re http2resAndError) (*Response, error) { res := re.res if re.err != nil || res.StatusCode > 299 { - + // On error or status code 3xx, 4xx, 5xx, etc abort any + // ongoing write, assuming that the server doesn't care + // about our request body. If the server replied with 1xx or + // 2xx, however, then assume the server DOES potentially + // want our body (e.g. full-duplex streaming: + // golang.org/issue/13444). If it turns out the server + // doesn't, they'll RST_STREAM us soon enough. This is a + // heuristic to avoid adding knobs to Transport. Hopefully + // we can keep it. bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } @@ -7018,10 +7353,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { } return nil, http2errRequestCanceled case <-cs.peerReset: - + // processResetStream already removed the + // stream from the streams map; no need for + // forgetStreamID. return nil, cs.resetErr case err := <-bodyWriter.resc: - + // Prefer the read loop's response, if available. Issue 16102. select { case re := <-readLoopResCh: return handleReadLoopResponse(re) @@ -7042,7 +7379,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { // requires cc.wmu be held func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { - first := true + first := true // first frame written (HEADERS is first, then CONTINUATION) frameSize := int(cc.maxFrameSize) for len(hdrs) > 0 && cc.werr == nil { chunk := hdrs @@ -7063,7 +7400,10 @@ func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs [] cc.fr.WriteContinuation(streamID, endHeaders, chunk) } } - + // TODO(bradfitz): this Flush could potentially block (as + // could the WriteHeaders call(s) above), which means they + // wouldn't respond to Request.Cancel being readable. That's + // rare, but this should probably be in a goroutine. cc.bw.Flush() return cc.werr } @@ -7079,13 +7419,16 @@ var ( func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { cc := cs.cc - sentEnd := false + sentEnd := false // whether we sent the final DATA frame w/ END_STREAM buf := cc.frameScratchBuffer() defer cc.putFrameScratchBuffer(buf) defer func() { http2traceWroteRequest(cs.trace, err) - + // TODO: write h12Compare test showing whether + // Request.Body is closed by the Transport, + // and in multiple cases: server replies <=299 and >299 + // while still writing request body cerr := bodyCloser.Close() if err == nil { err = cerr @@ -7124,7 +7467,12 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos sentEnd = sawEOF && len(remain) == 0 && !hasTrailers err = cc.fr.WriteData(cs.ID, sentEnd, data) if err == nil { - + // TODO(bradfitz): this flush is for latency, not bandwidth. + // Most requests won't need this. Make this opt-in or + // opt-out? Use some heuristic on the body type? Nagel-like + // timers? Based on 'n'? Only last chunk of this for loop, + // unless flow control tokens are low? For now, always. + // If we change this, see comment below. err = cc.bw.Flush() } cc.wmu.Unlock() @@ -7135,7 +7483,9 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos } if sentEnd { - + // Already sent END_STREAM (which implies we have no + // trailers) and flushed, because currently all + // WriteData frames above get a flush. So we're done. return nil } @@ -7149,6 +7499,8 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos cc.wmu.Lock() defer cc.wmu.Unlock() + // Two ways to send END_STREAM: either with trailers, or + // with an empty DATA frame. if len(trls) > 0 { err = cc.writeHeaders(cs.ID, true, trls) } else { @@ -7182,7 +7534,7 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er take := a if int(take) > maxBytes { - take = int32(maxBytes) + take = int32(maxBytes) // can't truncate int; take is int32 } if take > int32(cc.maxFrameSize) { take = int32(cc.maxFrameSize) @@ -7230,6 +7582,9 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail } } + // Check for any invalid headers and return an error before we + // potentially pollute our hpack state. (We want to be able to + // continue to reuse the hpack encoder for future requests) for k, vv := range req.Header { if !httplex.ValidHeaderFieldName(k) { return nil, fmt.Errorf("invalid HTTP header name %q", k) @@ -7241,6 +7596,11 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail } } + // 8.1.2.3 Request Pseudo-Header Fields + // The :path pseudo-header field includes the path and query parts of the + // target URI (the path-absolute production and optionally a '?' character + // followed by the query production (see Sections 3.3 and 3.4 of + // [RFC3986]). cc.writeHeader(":authority", host) cc.writeHeader(":method", req.Method) if req.Method != "CONNECT" { @@ -7256,13 +7616,20 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail lowKey := strings.ToLower(k) switch lowKey { case "host", "content-length": - + // Host is :authority, already sent. + // Content-Length is automatic, set below. continue case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive": - + // Per 8.1.2.2 Connection-Specific Header + // Fields, don't send connection-specific + // fields. We have already checked if any + // are error-worthy so just ignore the rest. continue case "user-agent": - + // Match Go's http1 behavior: at most one + // User-Agent. If set to nil or empty string, + // then omit it. Otherwise if not mentioned, + // include the default (below). didUA = true if len(vv) < 1 { continue @@ -7300,7 +7667,8 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool { if contentLength < 0 { return false } - + // For zero bodies, whether we send a content-length depends on the method. + // It also kinda doesn't matter for http2 either way, with END_STREAM. switch method { case "POST", "PUT", "PATCH": return true @@ -7313,7 +7681,8 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool { func (cc *http2ClientConn) encodeTrailers(req *Request) []byte { cc.hbuf.Reset() for k, vv := range req.Trailer { - + // Transfer-Encoding, etc.. have already been filter at the + // start of RoundTrip lowKey := strings.ToLower(k) for _, v := range vv { cc.writeHeader(lowKey, v) @@ -7367,7 +7736,7 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr cc.idleTimer.Reset(cc.idleTimeout) } close(cs.done) - cc.cond.Broadcast() + cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl } return cs } @@ -7426,6 +7795,9 @@ func (rl *http2clientConnReadLoop) cleanup() { cc.idleTimer.Stop() } + // Close any response bodies if the server closes prematurely. + // TODO: also do this if we've written the headers but not + // gotten a response yet. err := cc.readerErr cc.mu.Lock() if cc.goAway != nil && http2isEOFOrNetReadError(err) { @@ -7455,7 +7827,7 @@ func (rl *http2clientConnReadLoop) cleanup() { func (rl *http2clientConnReadLoop) run() error { cc := rl.cc rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse - gotReply := false + gotReply := false // ever saw a HEADERS reply gotSettings := false for { f, err := cc.fr.ReadFrame() @@ -7463,7 +7835,7 @@ func (rl *http2clientConnReadLoop) run() error { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(http2StreamError); ok { - if cs := cc.streamByID(se.StreamID, true); cs != nil { + if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { cs.cc.writeStreamReset(cs.ID, se.Code, err) if se.Cause == nil { se.Cause = cc.fr.errDetail @@ -7484,7 +7856,7 @@ func (rl *http2clientConnReadLoop) run() error { } gotSettings = true } - maybeIdle := false + maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { case *http2MetaHeadersFrame: @@ -7527,12 +7899,17 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro cc := rl.cc cs := cc.streamByID(f.StreamID, f.StreamEnded()) if cs == nil { - + // We'd get here if we canceled a request while the + // server had its response still in flight. So if this + // was just something we canceled, ignore it. return nil } if !cs.firstByte { if cs.trace != nil { - + // TODO(bradfitz): move first response byte earlier, + // when we first read the 9 byte header, not waiting + // until all the HEADERS+CONTINUATION frames have been + // merged. This works for now. http2traceFirstResponseByte(cs.trace) } cs.firstByte = true @@ -7548,13 +7925,13 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro if _, ok := err.(http2ConnectionError); ok { return err } - + // Any other error type is a stream error. cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err) cs.resc <- http2resAndError{err: err} - return nil + return nil // return nil from process* funcs to keep conn alive } if res == nil { - + // (nil, nil) special case. See handleResponse docs. return nil } if res.Body != http2noBody { @@ -7589,9 +7966,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http if statusCode == 100 { http2traceGot100Continue(cs.trace) if cs.on100 != nil { - cs.on100() + cs.on100() // forces any write delay timer to fire } - cs.pastHeaders = false + cs.pastHeaders = false // do it all again return nil, nil } @@ -7627,10 +8004,12 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { res.ContentLength = clen64 } else { - + // TODO: care? unlike http/1, it won't mess up our framing, so it's + // more safe smuggling-wise to ignore. } } else if len(clens) > 1 { - + // TODO: care? unlike http/1, it won't mess up our framing, so it's + // more safe smuggling-wise to ignore. } } @@ -7656,16 +8035,18 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *http2MetaHeadersFrame) error { if cs.pastTrailers { - + // Too many HEADERS frames for this stream. return http2ConnectionError(http2ErrCodeProtocol) } cs.pastTrailers = true if !f.StreamEnded() { - + // We expect that any headers for trailers also + // has END_STREAM. return http2ConnectionError(http2ErrCodeProtocol) } if len(f.PseudoFields()) > 0 { - + // No pseudo header fields are defined for trailers. + // TODO: ConnectionError might be overly harsh? Check. return http2ConnectionError(http2ErrCodeProtocol) } @@ -7713,7 +8094,7 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { } } if n == 0 { - + // No flow control tokens to send back. return } @@ -7721,13 +8102,15 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { defer cc.mu.Unlock() var connAdd, streamAdd int32 - + // Check the conn-level first, before the stream-level. if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { connAdd = http2transportDefaultConnFlow - v cc.inflow.add(connAdd) } - if err == nil { - + if err == nil { // No need to refresh if the stream is over or failed. + // Consider any buffered body data (read from the conn but not + // consumed by the client) when computing flow control for this + // stream. v := int(cs.inflow.available()) + cs.bufPipe.Len() if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh { streamAdd = int32(http2transportDefaultStreamFlow - v) @@ -7764,7 +8147,7 @@ func (b http2transportResponseBody) Close() error { cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) cs.didReset = true } - + // Return connection-level flow control. if unread > 0 { cc.inflow.add(int32(unread)) cc.fr.WriteWindowUpdate(0, uint32(unread)) @@ -7787,11 +8170,16 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { neverSent := cc.nextStreamID cc.mu.Unlock() if f.StreamID >= neverSent { - + // We never asked for this. cc.logf("http2: Transport received unsolicited DATA frame; closing connection") return http2ConnectionError(http2ErrCodeProtocol) } + // We probably did ask for this, but canceled. Just ignore it. + // TODO: be stricter here? only silently ignore things which + // we canceled, but not things which were closed normally + // by the peer? Tough without accumulating too much state. + // But at least return their flow control: if f.Length > 0 { cc.mu.Lock() cc.inflow.add(int32(f.Length)) @@ -7805,7 +8193,7 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { return nil } if f.Length > 0 { - + // Check connection-level flow control. cc.mu.Lock() if cs.inflow.available() >= int32(f.Length) { cs.inflow.take(int32(f.Length)) @@ -7813,7 +8201,8 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { cc.mu.Unlock() return http2ConnectionError(http2ErrCodeFlowControl) } - + // Return any padded flow control now, since we won't + // refund it later on body reads. if pad := int32(f.Length) - int32(len(data)); pad > 0 { cs.inflow.add(pad) cc.inflow.add(pad) @@ -7843,7 +8232,8 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { var http2errInvalidTrailers = errors.New("http2: invalid trailers") func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) { - + // TODO: check that any declared content-length matches, like + // server.go's (*stream).endStream method. rl.endStreamError(cs, nil) } @@ -7879,7 +8269,7 @@ func (rl *http2clientConnReadLoop) processGoAway(f *http2GoAwayFrame) error { cc := rl.cc cc.t.connPool().MarkDead(cc) if f.ErrCode != 0 { - + // TODO: deal with GOAWAY more. particularly the error code cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) } cc.setGoAway(f) @@ -7906,11 +8296,17 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error case http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val case http2SettingInitialWindowSize: - + // Values above the maximum flow-control + // window size of 2^31-1 MUST be treated as a + // connection error (Section 5.4.1) of type + // FLOW_CONTROL_ERROR. if s.Val > math.MaxInt32 { return http2ConnectionError(http2ErrCodeFlowControl) } + // Adjust flow control of currently-open + // frames by the difference of the old initial + // window size and this one. delta := int32(s.Val) - int32(cc.initialWindowSize) for _, cs := range cc.streams { cs.flow.add(delta) @@ -7919,7 +8315,7 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error cc.initialWindowSize = s.Val default: - + // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. cc.vlogf("Unhandled Setting: %v", s) } return nil @@ -7960,18 +8356,21 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error { cs := rl.cc.streamByID(f.StreamID, true) if cs == nil { - + // TODO: return error if server tries to RST_STEAM an idle stream return nil } select { case <-cs.peerReset: - + // Already reset. + // This is the only goroutine + // which closes this, so there + // isn't a race. default: err := http2streamError(cs.ID, f.ErrCode) cs.resetErr = err close(cs.peerReset) cs.bufPipe.CloseWithError(err) - cs.cc.cond.Broadcast() + cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl } delete(rl.activeRes, cs.ID) return nil @@ -7988,7 +8387,7 @@ func (cc *http2ClientConn) ping(ctx http2contextContext) error { return err } cc.mu.Lock() - + // check for dup before insert if _, found := cc.pings[p]; !found { cc.pings[p] = c cc.mu.Unlock() @@ -8012,7 +8411,7 @@ func (cc *http2ClientConn) ping(ctx http2contextContext) error { case <-ctx.Done(): return ctx.Err() case <-cc.readerDone: - + // connection closed return cc.readerErr } } @@ -8022,7 +8421,7 @@ func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error { cc := rl.cc cc.mu.Lock() defer cc.mu.Unlock() - + // If ack, notify listener if any if c, ok := cc.pings[f.Data]; ok { close(c) delete(cc.pings, f.Data) @@ -8039,12 +8438,21 @@ func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error { } func (rl *http2clientConnReadLoop) processPushPromise(f *http2PushPromiseFrame) error { - + // We told the peer we don't want them. + // Spec says: + // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH + // setting of the peer endpoint is set to 0. An endpoint that + // has set this setting and has received acknowledgement MUST + // treat the receipt of a PUSH_PROMISE frame as a connection + // error (Section 5.4.1) of type PROTOCOL_ERROR." return http2ConnectionError(http2ErrCodeProtocol) } func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, err error) { - + // TODO: map err to more interesting error codes, once the + // HTTP community comes up with some. But currently for + // RST_STREAM there's no equivalent to GOAWAY frame's debug + // data, and the error codes are all pretty vague ("cancel"). cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) cc.bw.Flush() @@ -8173,7 +8581,8 @@ func (s http2bodyWriterState) cancel() { func (s http2bodyWriterState) on100() { if s.timer == nil { - + // If we didn't do a delayed write, ignore the server's + // bogus 100 continue response. return } s.timer.Stop() @@ -8185,7 +8594,9 @@ func (s http2bodyWriterState) on100() { // called until after the headers have been written. func (s http2bodyWriterState) scheduleBodyWrite() { if s.timer == nil { - + // We're not doing a delayed write (see + // getBodyWriterState), so just start the writing + // goroutine immediately. go s.fn() return } @@ -8240,7 +8651,9 @@ func http2writeEndsStream(w http2writeFramer) bool { case *http2writeResHeaders: return v.endStream case nil: - + // This can only happen if the caller reuses w after it's + // been intentionally nil'ed out to prevent use. Keep this + // here to catch future refactoring breaking it. panic("writeEndsStream called on nil writeFramer") } return false @@ -8274,7 +8687,7 @@ type http2writeGoAway struct { func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error { err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil) if p.code != 0 { - ctx.Flush() + ctx.Flush() // ignore error: we're hanging up on them anyway time.Sleep(50 * time.Millisecond) ctx.CloseConn() } @@ -8386,7 +8799,13 @@ func http2encKV(enc *hpack.Encoder, k, v string) { } func (w *http2writeResHeaders) staysWithinBuffer(max int) bool { - + // TODO: this is a common one. It'd be nice to return true + // here and get into the fast path if we could be clever and + // calculate the size fast enough, or at least a conservative + // uppper bound that usually fires. (Maybe if w.h and + // w.trailers are nil, so we don't need to enumerate it.) + // Otherwise I'm afraid that just calculating the length to + // answer this question would be slower than the ~2µs benefit. return false } @@ -8445,7 +8864,7 @@ type http2writePushPromise struct { } func (w *http2writePushPromise) staysWithinBuffer(max int) bool { - + // TODO: see writeResHeaders.staysWithinBuffer return false } @@ -8497,7 +8916,7 @@ func (w http2write100ContinueHeadersFrame) writeFrame(ctx http2writeContext) err } func (w http2write100ContinueHeadersFrame) staysWithinBuffer(max int) bool { - + // Sloppy but conservative: return 9+2*(len(":status")+len("100")) <= max } @@ -8517,7 +8936,9 @@ func (wu http2writeWindowUpdate) writeFrame(ctx http2writeContext) error { func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) { if keys == nil { sorter := http2sorterPool.Get().(*http2sorter) - + // Using defer here, since the returned keys from the + // sorter.Keys method is only valid until the sorter + // is returned: defer http2sorterPool.Put(sorter) keys = sorter.Keys(h) } @@ -8525,16 +8946,19 @@ func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) { vv := h[k] k = http2lowerHeader(k) if !http2validWireHeaderFieldName(k) { - + // Skip it as backup paranoia. Per + // golang.org/issue/14048, these should + // already be rejected at a higher level. continue } isTE := k == "transfer-encoding" for _, v := range vv { if !httplex.ValidHeaderFieldValue(v) { - + // TODO: return an error? golang.org/issue/14048 + // For now just omit it. continue } - + // TODO: more of "8.1.2.2 Connection-Specific Header Fields" if isTE && v != "trailers" { continue } @@ -8602,7 +9026,10 @@ type http2FrameWriteRequest struct { func (wr http2FrameWriteRequest) StreamID() uint32 { if wr.stream == nil { if se, ok := wr.write.(http2StreamError); ok { - + // (*serverConn).resetStream doesn't set + // stream because it doesn't necessarily have + // one. So special case this type of write + // message. return se.StreamID } return 0 @@ -8632,11 +9059,13 @@ func (wr http2FrameWriteRequest) DataSize() int { func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2FrameWriteRequest, int) { var empty http2FrameWriteRequest + // Non-DATA frames are always consumed whole. wd, ok := wr.write.(*http2writeData) if !ok || len(wd.p) == 0 { return wr, empty, 1 } + // Might need to split after applying limits. allowed := wr.stream.flow.available() if n < allowed { allowed = n @@ -8654,10 +9083,13 @@ func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2 write: &http2writeData{ streamID: wd.streamID, p: wd.p[:allowed], - + // Even if the original had endStream set, there + // are bytes remaining because len(wd.p) > allowed, + // so we know endStream is false. endStream: false, }, - + // Our caller is blocking on the final DATA frame, not + // this intermediate frame, so no need to wait. done: nil, } rest := http2FrameWriteRequest{ @@ -8672,6 +9104,8 @@ func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2 return consumed, rest, 2 } + // The frame is consumed whole. + // NB: This cast cannot overflow because allowed is <= math.MaxInt32. wr.stream.flow.take(int32(len(wd.p))) return wr, empty, 1 } @@ -8698,7 +9132,7 @@ func (wr *http2FrameWriteRequest) replyToWriter(err error) { default: panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write)) } - wr.write = nil + wr.write = nil // prevent use (assume it's tainted after wr.done send) } // writeQueue is used by implementations of WriteScheduler. @@ -8717,7 +9151,7 @@ func (q *http2writeQueue) shift() http2FrameWriteRequest { panic("invalid use of queue") } wr := q.s[0] - + // TODO: less copy-happy queue. copy(q.s, q.s[1:]) q.s[len(q.s)-1] = http2FrameWriteRequest{} q.s = q.s[:len(q.s)-1] @@ -8746,6 +9180,8 @@ func (q *http2writeQueue) consume(n int32) (http2FrameWriteRequest, bool) { type http2writeQueuePool []*http2writeQueue +// put inserts an unused writeQueue into the pool. + // put inserts an unused writeQueue into the pool. func (p *http2writeQueuePool) put(q *http2writeQueue) { for i := range q.s { @@ -8769,7 +9205,7 @@ func (p *http2writeQueuePool) get() *http2writeQueue { } // RFC 7540, Section 5.3.5: the default weight is 16. -const http2priorityDefaultWeight = 15 // 16 = 15 + 1 +const http2priorityDefaultWeight = 15 // PriorityWriteSchedulerConfig configures a priorityWriteScheduler. type http2PriorityWriteSchedulerConfig struct { @@ -8815,7 +9251,8 @@ type http2PriorityWriteSchedulerConfig struct { // If cfg is nil, default options are used. func http2NewPriorityWriteScheduler(cfg *http2PriorityWriteSchedulerConfig) http2WriteScheduler { if cfg == nil { - + // For justification of these defaults, see: + // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY cfg = &http2PriorityWriteSchedulerConfig{ MaxClosedNodesInTree: 10, MaxIdleNodesInTree: 10, @@ -8870,7 +9307,7 @@ func (n *http2priorityNode) setParent(parent *http2priorityNode) { if n.parent == parent { return } - + // Unlink from current parent. if parent := n.parent; parent != nil { if n.prev == nil { parent.kids = n.next @@ -8881,7 +9318,9 @@ func (n *http2priorityNode) setParent(parent *http2priorityNode) { n.next.prev = n.prev } } - + // Link to new parent. + // If parent=nil, remove n from the tree. + // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder). n.parent = parent if parent == nil { n.next = nil @@ -8917,10 +9356,15 @@ func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2prior return false } + // Don't consider the root "open" when updating openParent since + // we can't send data frames on the root stream (only control frames). if n.id != 0 { openParent = openParent || (n.state == http2priorityNodeOpen) } + // Common case: only one kid or all kids have the same weight. + // Some clients don't use weights; other clients (like web browsers) + // use mostly-linear priority trees. w := n.kids.weight needSort := false for k := n.kids.next; k != nil; k = k.next { @@ -8938,6 +9382,8 @@ func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2prior return false } + // Uncommon case: sort the child nodes. We remove the kids from the parent, + // then re-insert after sorting so we can reuse tmp for future sort calls. *tmp = (*tmp)[:0] for n.kids != nil { *tmp = append(*tmp, n.kids) @@ -8945,7 +9391,7 @@ func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2prior } sort.Sort(http2sortPriorityNodeSiblings(*tmp)) for i := len(*tmp) - 1; i >= 0; i-- { - (*tmp)[i].setParent(n) + (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids } for k := n.kids; k != nil; k = k.next { if k.walkReadyInOrder(openParent, tmp, f) { @@ -8962,7 +9408,8 @@ func (z http2sortPriorityNodeSiblings) Len() int { return len(z) } func (z http2sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] } func (z http2sortPriorityNodeSiblings) Less(i, k int) bool { - + // Prefer the subtree that has sent fewer bytes relative to its weight. + // See sections 5.3.2 and 5.3.4. wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes) wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes) if bi == 0 && bk == 0 { @@ -9004,7 +9451,7 @@ type http2priorityWriteScheduler struct { } func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) { - + // The stream may be currently idle but cannot be opened or closed. if curr := ws.nodes[streamID]; curr != nil { if curr.state != http2priorityNodeIdle { panic(fmt.Sprintf("stream %d already opened", streamID)) @@ -9013,6 +9460,10 @@ func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2 return } + // RFC 7540, Section 5.3.5: + // "All streams are initially assigned a non-exclusive dependency on stream 0x0. + // Pushed streams initially depend on their associated stream. In both cases, + // streams are assigned a default weight of 16." parent := ws.nodes[options.PusherID] if parent == nil { parent = &ws.root @@ -9060,6 +9511,9 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht panic("adjustPriority on root") } + // If streamID does not exist, there are two cases: + // - A closed stream that has been removed (this will have ID <= maxID) + // - An idle stream that is being used for "grouping" (this will have ID > maxID) n := ws.nodes[streamID] if n == nil { if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 { @@ -9077,6 +9531,8 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n) } + // Section 5.3.1: A dependency on a stream that is not currently in the tree + // results in that stream being given a default priority (Section 5.3.5). parent := ws.nodes[priority.StreamDep] if parent == nil { n.setParent(&ws.root) @@ -9084,10 +9540,18 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht return } + // Ignore if the client tries to make a node its own parent. if n == parent { return } + // Section 5.3.3: + // "If a stream is made dependent on one of its own dependencies, the + // formerly dependent stream is first moved to be dependent on the + // reprioritized stream's previous parent. The moved dependency retains + // its weight." + // + // That is: if parent depends on n, move parent to depend on n.parent. for x := parent.parent; x != nil; x = x.parent { if x == n { parent.setParent(n.parent) @@ -9095,6 +9559,9 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht } } + // Section 5.3.3: The exclusive flag causes the stream to become the sole + // dependency of its parent stream, causing other dependencies to become + // dependent on the exclusive stream. if priority.Exclusive { k := parent.kids for k != nil { @@ -9117,7 +9584,11 @@ func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) { } else { n = ws.nodes[id] if n == nil { - + // id is an idle or closed stream. wr should not be a HEADERS or + // DATA frame. However, wr can be a RST_STREAM. In this case, we + // push wr onto the root, rather than creating a new priorityNode, + // since RST_STREAM is tiny and the stream's priority is unknown + // anyway. See issue #17919. if wr.DataSize() > 0 { panic("add DATA on non-open stream") } @@ -9138,7 +9609,9 @@ func (ws *http2priorityWriteScheduler) Pop() (wr http2FrameWriteRequest, ok bool return false } n.addBytes(int64(wr.DataSize())) - + // If B depends on A and B continuously has data available but A + // does not, gradually increase the throttling limit to allow B to + // steal more and more bandwidth from A. if openParent { ws.writeThrottleLimit += 1024 if ws.writeThrottleLimit < 0 { @@ -9157,7 +9630,7 @@ func (ws *http2priorityWriteScheduler) addClosedOrIdleNode(list *[]*http2priorit return } if len(*list) == maxSize { - + // Remove the oldest node, then shift left. ws.removeNode((*list)[0]) x := (*list)[1:] copy(*list, x) @@ -9195,7 +9668,7 @@ type http2randomWriteScheduler struct { } func (ws *http2randomWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) { - + // no-op: idle streams are not tracked } func (ws *http2randomWriteScheduler) CloseStream(streamID uint32) { @@ -9208,7 +9681,7 @@ func (ws *http2randomWriteScheduler) CloseStream(streamID uint32) { } func (ws *http2randomWriteScheduler) AdjustStream(streamID uint32, priority http2PriorityParam) { - + // no-op: priorities are ignored } func (ws *http2randomWriteScheduler) Push(wr http2FrameWriteRequest) { @@ -9226,11 +9699,11 @@ func (ws *http2randomWriteScheduler) Push(wr http2FrameWriteRequest) { } func (ws *http2randomWriteScheduler) Pop() (http2FrameWriteRequest, bool) { - + // Control frames first. if !ws.zero.empty() { return ws.zero.shift(), true } - + // Iterate over all non-idle streams until finding one that can be consumed. for _, q := range ws.sq { if wr, ok := q.consume(math.MaxInt32); ok { return wr, true -- 2.48.1