err = io.ErrShortWrite
return
}
- _, err = io.WriteString(cw.Wire, "\r\n")
-
+ if _, err = io.WriteString(cw.Wire, "\r\n"); err != nil {
+ return
+ }
+ if bw, ok := cw.Wire.(*FlushAfterChunkWriter); ok {
+ err = bw.Flush()
+ }
return
}
return err
}
+// FlushAfterChunkWriter signals from the caller of NewChunkedWriter
+// that each chunk should be followed by a flush. It is used by the
+// http.Transport code to keep the buffering behavior for headers and
+// trailers, but flush out chunks aggressively in the middle for
+// request bodies which may be generated slowly. See Issue 6574.
+type FlushAfterChunkWriter struct {
+ *bufio.Writer
+}
+
func parseHexUint(v []byte) (n uint64, err error) {
for _, b := range v {
n <<= 4
Close bool
TransferEncoding []string
Trailer Header
+ IsResponse bool
}
func newTransferWriter(r interface{}) (t *transferWriter, err error) {
}
}
case *Response:
+ t.IsResponse = true
if rr.Request != nil {
t.Method = rr.Request.Method
}
// Write body
if t.Body != nil {
if chunked(t.TransferEncoding) {
+ if bw, ok := w.(*bufio.Writer); ok && !t.IsResponse {
+ w = &internal.FlushAfterChunkWriter{bw}
+ }
cw := internal.NewChunkedWriter(w)
_, err = io.Copy(cw, t.Body)
if err == nil {
"net/http/httptest"
"net/url"
"os"
+ "reflect"
"runtime"
"strconv"
"strings"
}
}
+// logWritesConn is a net.Conn that logs each Write call to writes
+// and then proxies to w.
+// It proxies Read calls to a reader it receives from rch.
+type logWritesConn struct {
+ net.Conn // nil. crash on use.
+
+ w io.Writer
+
+ rch <-chan io.Reader
+ r io.Reader // nil until received by rch
+
+ mu sync.Mutex
+ writes []string
+}
+
+func (c *logWritesConn) Write(p []byte) (n int, err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.writes = append(c.writes, string(p))
+ return c.w.Write(p)
+}
+
+func (c *logWritesConn) Read(p []byte) (n int, err error) {
+ if c.r == nil {
+ c.r = <-c.rch
+ }
+ return c.r.Read(p)
+}
+
+func (c *logWritesConn) Close() error { return nil }
+
+// Issue 6574
+func TestTransportFlushesBodyChunks(t *testing.T) {
+ defer afterTest(t)
+ resBody := make(chan io.Reader, 1)
+ connr, connw := io.Pipe() // connection pipe pair
+ lw := &logWritesConn{
+ rch: resBody,
+ w: connw,
+ }
+ tr := &Transport{
+ Dial: func(network, addr string) (net.Conn, error) {
+ return lw, nil
+ },
+ }
+ bodyr, bodyw := io.Pipe() // body pipe pair
+ go func() {
+ defer bodyw.Close()
+ for i := 0; i < 3; i++ {
+ fmt.Fprintf(bodyw, "num%d\n", i)
+ }
+ }()
+ resc := make(chan *Response)
+ go func() {
+ req, _ := NewRequest("POST", "http://localhost:8080", bodyr)
+ req.Header.Set("User-Agent", "x") // known value for test
+ res, err := tr.RoundTrip(req)
+ if err != nil {
+ t.Error("RoundTrip: %v", err)
+ close(resc)
+ return
+ }
+ resc <- res
+
+ }()
+ // Fully consume the request before checking the Write log vs. want.
+ req, err := ReadRequest(bufio.NewReader(connr))
+ if err != nil {
+ t.Fatal(err)
+ }
+ io.Copy(ioutil.Discard, req.Body)
+
+ // Unblock the transport's roundTrip goroutine.
+ resBody <- strings.NewReader("HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n")
+ res, ok := <-resc
+ if !ok {
+ return
+ }
+ defer res.Body.Close()
+
+ want := []string{
+ // Because Request.ContentLength = 0, the body is sniffed for 1 byte to determine whether there's content.
+ // That explains the initial "num0" being split into "n" and "um0".
+ // The first byte is included with the request headers Write. Perhaps in the future
+ // we will want to flush the headers out early if the first byte of the request body is
+ // taking a long time to arrive. But not yet.
+ "POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n" +
+ "1\r\nn\r\n",
+ "4\r\num0\n\r\n",
+ "5\r\nnum1\n\r\n",
+ "5\r\nnum2\n\r\n",
+ "0\r\n\r\n",
+ }
+ if !reflect.DeepEqual(lw.writes, want) {
+ t.Errorf("Writes differed.\n Got: %q\nWant: %q\n", lw.writes, want)
+ }
+}
+
func wantBody(res *http.Response, err error, want string) error {
if err != nil {
return err