From: Brad Fitzpatrick Date: Wed, 13 May 2015 19:41:56 +0000 (-0700) Subject: net/http: flush request body chunks in Transport X-Git-Tag: go1.5beta1~604 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=e5febf957f5be9d3325c2d851bff6ec7c55e4662;p=gostls13.git net/http: flush request body chunks in Transport The Transport's writer to the remote server is wrapped in a bufio.Writer to suppress many small writes while writing headers and trailers. However, when writing the request body, the buffering may get in the way if the request body is arriving slowly. Because the io.Copy from the Request.Body to the writer is already buffered, the outer bufio.Writer is unnecessary and prevents small Request.Body.Reads from going to the server right away. (and the io.Reader contract does say to return when you've got something, instead of blocking waiting for more). After the body is finished, the Transport's bufio.Writer is still used for any trailers following. A previous attempted fix for this made the chunk writer always flush if the underlying type was a bufio.Writer, but that is not quite correct. This CL instead makes it opt-in by using a private sentinel type (wrapping a *bufio.Writer) to the chunk writer that requests Flushes after each chunk body (the chunk header & chunk body are still buffered together into one write). Fixes #6574 Change-Id: Icefcdf17130c9e285c80b69af295bfd3e72c3a70 Reviewed-on: https://go-review.googlesource.com/10021 Reviewed-by: Andrew Gerrand Run-TryBot: Brad Fitzpatrick TryBot-Result: Gobot Gobot --- diff --git a/src/net/http/internal/chunked.go b/src/net/http/internal/chunked.go index 9294deb3e5..6d7c69874d 100644 --- a/src/net/http/internal/chunked.go +++ b/src/net/http/internal/chunked.go @@ -173,8 +173,12 @@ func (cw *chunkedWriter) Write(data []byte) (n int, err error) { err = io.ErrShortWrite return } - _, err = io.WriteString(cw.Wire, "\r\n") - + if _, err = io.WriteString(cw.Wire, "\r\n"); err != nil { + return + } + if bw, ok := cw.Wire.(*FlushAfterChunkWriter); ok { + err = bw.Flush() + } return } @@ -183,6 +187,15 @@ func (cw *chunkedWriter) Close() error { return err } +// FlushAfterChunkWriter signals from the caller of NewChunkedWriter +// that each chunk should be followed by a flush. It is used by the +// http.Transport code to keep the buffering behavior for headers and +// trailers, but flush out chunks aggressively in the middle for +// request bodies which may be generated slowly. See Issue 6574. +type FlushAfterChunkWriter struct { + *bufio.Writer +} + func parseHexUint(v []byte) (n uint64, err error) { for _, b := range v { n <<= 4 diff --git a/src/net/http/transfer.go b/src/net/http/transfer.go index 5640344345..289d53dec0 100644 --- a/src/net/http/transfer.go +++ b/src/net/http/transfer.go @@ -43,6 +43,7 @@ type transferWriter struct { Close bool TransferEncoding []string Trailer Header + IsResponse bool } func newTransferWriter(r interface{}) (t *transferWriter, err error) { @@ -89,6 +90,7 @@ func newTransferWriter(r interface{}) (t *transferWriter, err error) { } } case *Response: + t.IsResponse = true if rr.Request != nil { t.Method = rr.Request.Method } @@ -206,6 +208,9 @@ func (t *transferWriter) WriteBody(w io.Writer) error { // Write body if t.Body != nil { if chunked(t.TransferEncoding) { + if bw, ok := w.(*bufio.Writer); ok && !t.IsResponse { + w = &internal.FlushAfterChunkWriter{bw} + } cw := internal.NewChunkedWriter(w) _, err = io.Copy(cw, t.Body) if err == nil { diff --git a/src/net/http/transport_test.go b/src/net/http/transport_test.go index ace58896b8..ca1a3ab407 100644 --- a/src/net/http/transport_test.go +++ b/src/net/http/transport_test.go @@ -23,6 +23,7 @@ import ( "net/http/httptest" "net/url" "os" + "reflect" "runtime" "strconv" "strings" @@ -2447,6 +2448,104 @@ func TestTransportDialCancelRace(t *testing.T) { } } +// logWritesConn is a net.Conn that logs each Write call to writes +// and then proxies to w. +// It proxies Read calls to a reader it receives from rch. +type logWritesConn struct { + net.Conn // nil. crash on use. + + w io.Writer + + rch <-chan io.Reader + r io.Reader // nil until received by rch + + mu sync.Mutex + writes []string +} + +func (c *logWritesConn) Write(p []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + c.writes = append(c.writes, string(p)) + return c.w.Write(p) +} + +func (c *logWritesConn) Read(p []byte) (n int, err error) { + if c.r == nil { + c.r = <-c.rch + } + return c.r.Read(p) +} + +func (c *logWritesConn) Close() error { return nil } + +// Issue 6574 +func TestTransportFlushesBodyChunks(t *testing.T) { + defer afterTest(t) + resBody := make(chan io.Reader, 1) + connr, connw := io.Pipe() // connection pipe pair + lw := &logWritesConn{ + rch: resBody, + w: connw, + } + tr := &Transport{ + Dial: func(network, addr string) (net.Conn, error) { + return lw, nil + }, + } + bodyr, bodyw := io.Pipe() // body pipe pair + go func() { + defer bodyw.Close() + for i := 0; i < 3; i++ { + fmt.Fprintf(bodyw, "num%d\n", i) + } + }() + resc := make(chan *Response) + go func() { + req, _ := NewRequest("POST", "http://localhost:8080", bodyr) + req.Header.Set("User-Agent", "x") // known value for test + res, err := tr.RoundTrip(req) + if err != nil { + t.Error("RoundTrip: %v", err) + close(resc) + return + } + resc <- res + + }() + // Fully consume the request before checking the Write log vs. want. + req, err := ReadRequest(bufio.NewReader(connr)) + if err != nil { + t.Fatal(err) + } + io.Copy(ioutil.Discard, req.Body) + + // Unblock the transport's roundTrip goroutine. + resBody <- strings.NewReader("HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n") + res, ok := <-resc + if !ok { + return + } + defer res.Body.Close() + + want := []string{ + // Because Request.ContentLength = 0, the body is sniffed for 1 byte to determine whether there's content. + // That explains the initial "num0" being split into "n" and "um0". + // The first byte is included with the request headers Write. Perhaps in the future + // we will want to flush the headers out early if the first byte of the request body is + // taking a long time to arrive. But not yet. + "POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n" + + "1\r\nn\r\n", + "4\r\num0\n\r\n", + "5\r\nnum1\n\r\n", + "5\r\nnum2\n\r\n", + "0\r\n\r\n", + } + if !reflect.DeepEqual(lw.writes, want) { + t.Errorf("Writes differed.\n Got: %q\nWant: %q\n", lw.writes, want) + } +} + func wantBody(res *http.Response, err error, want string) error { if err != nil { return err