]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: flush request body chunks in Transport
authorBrad Fitzpatrick <bradfitz@golang.org>
Wed, 13 May 2015 19:41:56 +0000 (12:41 -0700)
committerBrad Fitzpatrick <bradfitz@golang.org>
Thu, 14 May 2015 00:29:24 +0000 (00:29 +0000)
The Transport's writer to the remote server is wrapped in a
bufio.Writer to suppress many small writes while writing headers and
trailers. However, when writing the request body, the buffering may get
in the way if the request body is arriving slowly.

Because the io.Copy from the Request.Body to the writer is already
buffered, the outer bufio.Writer is unnecessary and prevents small
Request.Body.Reads from going to the server right away. (and the
io.Reader contract does say to return when you've got something,
instead of blocking waiting for more). After the body is finished, the
Transport's bufio.Writer is still used for any trailers following.

A previous attempted fix for this made the chunk writer always flush
if the underlying type was a bufio.Writer, but that is not quite
correct. This CL instead makes it opt-in by using a private sentinel
type (wrapping a *bufio.Writer) to the chunk writer that requests
Flushes after each chunk body (the chunk header & chunk body are still
buffered together into one write).

Fixes #6574

Change-Id: Icefcdf17130c9e285c80b69af295bfd3e72c3a70
Reviewed-on: https://go-review.googlesource.com/10021
Reviewed-by: Andrew Gerrand <adg@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/net/http/internal/chunked.go
src/net/http/transfer.go
src/net/http/transport_test.go

index 9294deb3e5e36d6ac3f7c4cc47c55ac1e805e55f..6d7c69874d9d6f6acbce282a28cb80cd34d070a9 100644 (file)
@@ -173,8 +173,12 @@ func (cw *chunkedWriter) Write(data []byte) (n int, err error) {
                err = io.ErrShortWrite
                return
        }
-       _, err = io.WriteString(cw.Wire, "\r\n")
-
+       if _, err = io.WriteString(cw.Wire, "\r\n"); err != nil {
+               return
+       }
+       if bw, ok := cw.Wire.(*FlushAfterChunkWriter); ok {
+               err = bw.Flush()
+       }
        return
 }
 
@@ -183,6 +187,15 @@ func (cw *chunkedWriter) Close() error {
        return err
 }
 
+// FlushAfterChunkWriter signals from the caller of NewChunkedWriter
+// that each chunk should be followed by a flush. It is used by the
+// http.Transport code to keep the buffering behavior for headers and
+// trailers, but flush out chunks aggressively in the middle for
+// request bodies which may be generated slowly. See Issue 6574.
+type FlushAfterChunkWriter struct {
+       *bufio.Writer
+}
+
 func parseHexUint(v []byte) (n uint64, err error) {
        for _, b := range v {
                n <<= 4
index 564034434549b5a29094f99e9812c5b74185a5c3..289d53dec00c36a49d0df2fb110fa09594c2b1fe 100644 (file)
@@ -43,6 +43,7 @@ type transferWriter struct {
        Close            bool
        TransferEncoding []string
        Trailer          Header
+       IsResponse       bool
 }
 
 func newTransferWriter(r interface{}) (t *transferWriter, err error) {
@@ -89,6 +90,7 @@ func newTransferWriter(r interface{}) (t *transferWriter, err error) {
                        }
                }
        case *Response:
+               t.IsResponse = true
                if rr.Request != nil {
                        t.Method = rr.Request.Method
                }
@@ -206,6 +208,9 @@ func (t *transferWriter) WriteBody(w io.Writer) error {
        // Write body
        if t.Body != nil {
                if chunked(t.TransferEncoding) {
+                       if bw, ok := w.(*bufio.Writer); ok && !t.IsResponse {
+                               w = &internal.FlushAfterChunkWriter{bw}
+                       }
                        cw := internal.NewChunkedWriter(w)
                        _, err = io.Copy(cw, t.Body)
                        if err == nil {
index ace58896b859205a45da3ccf5fd636fb2d296d97..ca1a3ab407fb7dc1ba56eec07c22436deab884c5 100644 (file)
@@ -23,6 +23,7 @@ import (
        "net/http/httptest"
        "net/url"
        "os"
+       "reflect"
        "runtime"
        "strconv"
        "strings"
@@ -2447,6 +2448,104 @@ func TestTransportDialCancelRace(t *testing.T) {
        }
 }
 
+// logWritesConn is a net.Conn that logs each Write call to writes
+// and then proxies to w.
+// It proxies Read calls to a reader it receives from rch.
+type logWritesConn struct {
+       net.Conn // nil. crash on use.
+
+       w io.Writer
+
+       rch <-chan io.Reader
+       r   io.Reader // nil until received by rch
+
+       mu     sync.Mutex
+       writes []string
+}
+
+func (c *logWritesConn) Write(p []byte) (n int, err error) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.writes = append(c.writes, string(p))
+       return c.w.Write(p)
+}
+
+func (c *logWritesConn) Read(p []byte) (n int, err error) {
+       if c.r == nil {
+               c.r = <-c.rch
+       }
+       return c.r.Read(p)
+}
+
+func (c *logWritesConn) Close() error { return nil }
+
+// Issue 6574
+func TestTransportFlushesBodyChunks(t *testing.T) {
+       defer afterTest(t)
+       resBody := make(chan io.Reader, 1)
+       connr, connw := io.Pipe() // connection pipe pair
+       lw := &logWritesConn{
+               rch: resBody,
+               w:   connw,
+       }
+       tr := &Transport{
+               Dial: func(network, addr string) (net.Conn, error) {
+                       return lw, nil
+               },
+       }
+       bodyr, bodyw := io.Pipe() // body pipe pair
+       go func() {
+               defer bodyw.Close()
+               for i := 0; i < 3; i++ {
+                       fmt.Fprintf(bodyw, "num%d\n", i)
+               }
+       }()
+       resc := make(chan *Response)
+       go func() {
+               req, _ := NewRequest("POST", "http://localhost:8080", bodyr)
+               req.Header.Set("User-Agent", "x") // known value for test
+               res, err := tr.RoundTrip(req)
+               if err != nil {
+                       t.Error("RoundTrip: %v", err)
+                       close(resc)
+                       return
+               }
+               resc <- res
+
+       }()
+       // Fully consume the request before checking the Write log vs. want.
+       req, err := ReadRequest(bufio.NewReader(connr))
+       if err != nil {
+               t.Fatal(err)
+       }
+       io.Copy(ioutil.Discard, req.Body)
+
+       // Unblock the transport's roundTrip goroutine.
+       resBody <- strings.NewReader("HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n")
+       res, ok := <-resc
+       if !ok {
+               return
+       }
+       defer res.Body.Close()
+
+       want := []string{
+               // Because Request.ContentLength = 0, the body is sniffed for 1 byte to determine whether there's content.
+               // That explains the initial "num0" being split into "n" and "um0".
+               // The first byte is included with the request headers Write. Perhaps in the future
+               // we will want to flush the headers out early if the first byte of the request body is
+               // taking a long time to arrive. But not yet.
+               "POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n" +
+                       "1\r\nn\r\n",
+               "4\r\num0\n\r\n",
+               "5\r\nnum1\n\r\n",
+               "5\r\nnum2\n\r\n",
+               "0\r\n\r\n",
+       }
+       if !reflect.DeepEqual(lw.writes, want) {
+               t.Errorf("Writes differed.\n Got: %q\nWant: %q\n", lw.writes, want)
+       }
+}
+
 func wantBody(res *http.Response, err error, want string) error {
        if err != nil {
                return err