// Defaults to 15s.
PingTimeout time.Duration
+ // WriteByteTimeout is the timeout after which the connection will be
+ // closed no data can be written to it. The timeout begins when data is
+ // available to write, and is extended whenever any bytes are written.
+ WriteByteTimeout time.Duration
+
// CountError, if non-nil, is called on HTTP/2 transport errors.
// It's intended to increment a metric for monitoring, such
// as an expvar or Prometheus metric.
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type http2clientStream struct {
- cc *http2ClientConn
- req *Request
+ cc *http2ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ reqCancel <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe http2pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
+ isHead bool
abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
inflow http2flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu
// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
cs.abortErr = err
close(cs.abort)
})
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ }
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
}
}
-func (cs *http2clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
- }
+func (cs *http2clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
- if cs.stopReqBody == nil {
- cs.stopReqBody = err
+ defer cc.mu.Unlock()
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
cc.cond.Broadcast()
}
- cc.mu.Unlock()
}
type http2stickyErrWriter struct {
- w io.Writer
- err *error
+ conn net.Conn
+ timeout time.Duration
+ err *error
}
func (sew http2stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
- n, err = sew.w.Write(p)
- *sew.err = err
- return
+ for {
+ if sew.timeout != 0 {
+ sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
+ }
+ nn, err := sew.conn.Write(p[n:])
+ n += nn
+ if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
+ // Keep extending the deadline so long as we're making progress.
+ continue
+ }
+ if sew.timeout != 0 {
+ sew.conn.SetWriteDeadline(time.Time{})
+ }
+ *sew.err = err
+ return n, err
+ }
}
// noCachedConnError is the concrete type of ErrNoCachedConn, which
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
- cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
+ cc.bw = bufio.NewWriter(http2stickyErrWriter{
+ conn: c,
+ timeout: t.WriteByteTimeout,
+ err: &cc.werr,
+ })
cc.br = bufio.NewReader(c)
cc.fr = http2NewFramer(cc.bw, cc.br)
if t.CountError != nil {
return true
}
+// ClientConnState describes the state of a ClientConn.
+type http2ClientConnState struct {
+ // Closed is whether the connection is closed.
+ Closed bool
+
+ // Closing is whether the connection is in the process of
+ // closing. It may be closing due to shutdown, being a
+ // single-use connection, being marked as DoNotReuse, or
+ // having received a GOAWAY frame.
+ Closing bool
+
+ // StreamsActive is how many streams are active.
+ StreamsActive int
+
+ // StreamsReserved is how many streams have been reserved via
+ // ClientConn.ReserveNewRequest.
+ StreamsReserved int
+
+ // StreamsPending is how many requests have been sent in excess
+ // of the peer's advertised MaxConcurrentStreams setting and
+ // are waiting for other streams to complete.
+ StreamsPending int
+
+ // MaxConcurrentStreams is how many concurrent streams the
+ // peer advertised as acceptable. Zero means no SETTINGS
+ // frame has been received yet.
+ MaxConcurrentStreams uint32
+
+ // LastIdle, if non-zero, is when the connection last
+ // transitioned to idle state.
+ LastIdle time.Time
+}
+
+// State returns a snapshot of cc's state.
+func (cc *http2ClientConn) State() http2ClientConnState {
+ cc.wmu.Lock()
+ maxConcurrent := cc.maxConcurrentStreams
+ if !cc.seenSettings {
+ maxConcurrent = 0
+ }
+ cc.wmu.Unlock()
+
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return http2ClientConnState{
+ Closed: cc.closed,
+ Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
+ StreamsActive: len(cc.streams),
+ StreamsReserved: cc.streamsReserved,
+ StreamsPending: cc.pendingRequests,
+ LastIdle: cc.lastIdle,
+ MaxConcurrentStreams: maxConcurrent,
+ }
+}
+
// clientConnIdleState describes the suitability of a client
// connection to initiate a new RoundTrip request.
type http2clientConnIdleState struct {
func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
ctx := req.Context()
cs := &http2clientStream{
- cc: cc,
- req: req,
- trace: httptrace.ContextClientTrace(req.Context()),
- peerClosed: make(chan struct{}),
- abort: make(chan struct{}),
- respHeaderRecv: make(chan struct{}),
- donec: make(chan struct{}),
- }
- go cs.doRequest()
+ cc: cc,
+ ctx: ctx,
+ reqCancel: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: http2actualContentLength(req),
+ trace: httptrace.ContextClientTrace(ctx),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
+ }
+ go cs.doRequest(req)
waitDone := func() error {
select {
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
}
}
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(http2errRequestCanceled)
return nil, http2errRequestCanceled
}
}
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *http2clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *http2clientStream) doRequest(req *Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *http2clientStream) writeRequest() (err error) {
+func (cs *http2clientStream) writeRequest(req *Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := http2checkConnHeaders(cs.req); err != nil {
+ if err := http2checkConnHeaders(req); err != nil {
return err
}
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
case <-ctx.Done():
return ctx.Err()
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := http2actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = http2errRequestCanceled
}
timer.Stop()
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != http2errStopReqBodyWrite {
http2traceWroteRequest(cs.trace, err)
return err
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
}
}
}
-func (cs *http2clientStream) encodeAndWriteHeaders() error {
+func (cs *http2clientStream) encodeAndWriteHeaders(req *Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
default:
}
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := http2commaSeparatedTrailers(cs.req)
+ trailers, err := http2commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := http2actualContentLength(cs.req)
+ contentLen := http2actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
// cleanupWriteRequest will send a reset to the peer.
func (cs *http2clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
if cs.ID != 0 {
cc.forgetStreamID(cs.ID)
}
- close(cs.donec)
cc.wmu.Lock()
werr := cc.werr
if werr != nil {
cc.Close()
}
+
+ close(cs.donec)
}
// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
if n > max {
n = max
}
- if cl := http2actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
var http2bufPool sync.Pool // of *[]byte
-func (cs *http2clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *http2clientStream) writeRequestBody(req *Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := http2actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
return err
}
}
- if err == io.EOF {
- sawEOF = true
- err = nil
- } else if err != nil {
- return err
+ if err != nil {
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cc.mu.Unlock()
+ switch {
+ case bodyClosed:
+ return http2errStopReqBodyWrite
+ case err == io.EOF:
+ sawEOF = true
+ err = nil
+ default:
+ return err
+ }
}
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == http2errStopReqBodyWrite:
- return err
- case err == http2errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
return nil
}
+ // Since the RoundTrip contract permits the caller to "mutate or reuse"
+ // a request after the Response's Body is closed, verify that this hasn't
+ // happened before accessing the trailers.
+ cc.mu.Lock()
+ trailer := req.Trailer
+ err = cs.abortErr
+ cc.mu.Unlock()
+ if err != nil {
+ return err
+ }
+
cc.wmu.Lock()
+ defer cc.wmu.Unlock()
var trls []byte
- if hasTrailers {
- trls, err = cc.encodeTrailers(req)
+ if len(trailer) > 0 {
+ trls, err = cc.encodeTrailers(trailer)
if err != nil {
- cc.wmu.Unlock()
return err
}
}
- defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
// if the stream is dead.
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, http2errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, http2errRequestCanceled
default:
}
}
// requires cc.wmu be held.
-func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) {
+func (cc *http2ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
return nil, http2errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := http2asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
}
cc.closed = true
for _, cs := range cc.streams {
- cs.abortStreamLocked(err)
+ select {
+ case <-cs.peerClosed:
+ // The server closed the stream before closing the conn,
+ // so no need to interrupt it.
+ default:
+ cs.abortStreamLocked(err)
+ }
}
cc.cond.Broadcast()
cc.mu.Unlock()
return nil, nil
}
- streamEnded := f.StreamEnded()
- isHead := cs.req.Method == "HEAD"
- if !streamEnded || isHead {
- res.ContentLength = -1
- if clens := res.Header["Content-Length"]; len(clens) == 1 {
- if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
- res.ContentLength = int64(cl)
- } else {
- // TODO: care? unlike http/1, it won't mess up our framing, so it's
- // more safe smuggling-wise to ignore.
- }
- } else if len(clens) > 1 {
+ res.ContentLength = -1
+ if clens := res.Header["Content-Length"]; len(clens) == 1 {
+ if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
+ res.ContentLength = int64(cl)
+ } else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
+ } else if len(clens) > 1 {
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
+ } else if f.StreamEnded() && !cs.isHead {
+ res.ContentLength = 0
}
- if streamEnded || isHead {
+ if cs.isHead {
res.Body = http2noBody
return res, nil
}
+ if f.StreamEnded() {
+ if res.ContentLength > 0 {
+ res.Body = http2missingBody{}
+ } else {
+ res.Body = http2noBody
+ }
+ return res, nil
+ }
+
cs.bufPipe.setBuffer(&http2dataBuffer{expected: res.ContentLength})
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type http2transportResponseBody struct {
cs *http2clientStream
}
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return http2errRequestCanceled
}
return nil
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, http2StreamError{
StreamID: f.StreamID,
// server.go's (*stream).endStream method.
if !cs.readClosed {
cs.readClosed = true
+ // Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
+ // race condition: The caller can read io.EOF from Response.Body
+ // and close the body before we close cs.peerClosed, causing
+ // cleanupWriteRequest to send a RST_STREAM.
+ rl.cc.mu.Lock()
+ defer rl.cc.mu.Unlock()
cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
close(cs.peerClosed)
}
}
cc.mu.Unlock()
}
- cc.wmu.Lock()
- if err := cc.fr.WritePing(false, p); err != nil {
- cc.wmu.Unlock()
- return err
- }
- if err := cc.bw.Flush(); err != nil {
- cc.wmu.Unlock()
- return err
- }
- cc.wmu.Unlock()
+ errc := make(chan error, 1)
+ go func() {
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+ if err := cc.fr.WritePing(false, p); err != nil {
+ errc <- err
+ return
+ }
+ if err := cc.bw.Flush(); err != nil {
+ errc <- err
+ return
+ }
+ }()
select {
case <-c:
return nil
+ case err := <-errc:
+ return err
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
var http2noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+type http2missingBody struct{}
+
+func (http2missingBody) Close() error { return nil }
+
+func (http2missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
+
func http2strSliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {