From ff29be14c4c63912963c442109da56a98960ea2d Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 29 Jan 2014 13:44:21 +0100 Subject: [PATCH] net/http: read as much as possible (including EOF) during chunked reads This is the chunked half of https://golang.org/cl/49570044 . We want full reads to return EOF as early as possible, when we know we're at the end, so http.Transport client connections are eagerly re-used in the common case, even if no Read or Close follows. To do this, make the chunkedReader.Read fill up its argument p []byte buffer as much as possible, as long as that doesn't involve doing any more blocking reads to read chunk headers. That means if we have a chunk EOF ("0\r\n") sitting in the incoming bufio.Reader, we see it and set EOF on our final Read. LGTM=adg R=adg CC=golang-codereviews https://golang.org/cl/58240043 --- src/pkg/net/http/chunked.go | 58 ++++++++++----- src/pkg/net/http/chunked_test.go | 77 +++++++++++++++++++- src/pkg/net/http/httputil/chunked.go | 74 +++++++++++-------- src/pkg/net/http/httputil/chunked_test.go | 87 ++++++++++++++++++++--- src/pkg/net/http/httputil/httputil.go | 32 +++++++++ src/pkg/net/http/httputil/persist.go | 2 - src/pkg/net/http/transfer_test.go | 27 +++++++ src/pkg/net/http/transport_test.go | 55 ++++++++------ 8 files changed, 328 insertions(+), 84 deletions(-) create mode 100644 src/pkg/net/http/httputil/httputil.go diff --git a/src/pkg/net/http/chunked.go b/src/pkg/net/http/chunked.go index 91db017245..749f29d326 100644 --- a/src/pkg/net/http/chunked.go +++ b/src/pkg/net/http/chunked.go @@ -4,13 +4,14 @@ // The wire protocol for HTTP's "chunked" Transfer-Encoding. -// This code is duplicated in httputil/chunked.go. +// This code is duplicated in net/http and net/http/httputil. // Please make any changes in both files. package http import ( "bufio" + "bytes" "errors" "fmt" "io" @@ -57,26 +58,45 @@ func (cr *chunkedReader) beginChunk() { } } -func (cr *chunkedReader) Read(b []uint8) (n int, err error) { - if cr.err != nil { - return 0, cr.err +func (cr *chunkedReader) chunkHeaderAvailable() bool { + n := cr.r.Buffered() + if n > 0 { + peek, _ := cr.r.Peek(n) + return bytes.IndexByte(peek, '\n') >= 0 } - if cr.n == 0 { - cr.beginChunk() - if cr.err != nil { - return 0, cr.err + return false +} + +func (cr *chunkedReader) Read(b []uint8) (n int, err error) { + for cr.err == nil { + if cr.n == 0 { + if n > 0 && !cr.chunkHeaderAvailable() { + // We've read enough. Don't potentially block + // reading a new chunk header. + break + } + cr.beginChunk() + continue } - } - if uint64(len(b)) > cr.n { - b = b[0:cr.n] - } - n, cr.err = cr.r.Read(b) - cr.n -= uint64(n) - if cr.n == 0 && cr.err == nil { - // end of chunk (CRLF) - if _, cr.err = io.ReadFull(cr.r, cr.buf[:]); cr.err == nil { - if cr.buf[0] != '\r' || cr.buf[1] != '\n' { - cr.err = errors.New("malformed chunked encoding") + if len(b) == 0 { + break + } + rbuf := b + if uint64(len(rbuf)) > cr.n { + rbuf = rbuf[:cr.n] + } + var n0 int + n0, cr.err = cr.r.Read(rbuf) + n += n0 + b = b[n0:] + cr.n -= uint64(n0) + // If we're at the end of a chunk, read the next two + // bytes to verify they are "\r\n". + if cr.n == 0 && cr.err == nil { + if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err == nil { + if cr.buf[0] != '\r' || cr.buf[1] != '\n' { + cr.err = errors.New("malformed chunked encoding") + } } } } diff --git a/src/pkg/net/http/chunked_test.go b/src/pkg/net/http/chunked_test.go index ae32a69ea7..34544790af 100644 --- a/src/pkg/net/http/chunked_test.go +++ b/src/pkg/net/http/chunked_test.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// This code is duplicated in httputil/chunked_test.go. +// This code is duplicated in net/http and net/http/httputil. // Please make any changes in both files. package http @@ -13,6 +13,7 @@ import ( "fmt" "io" "io/ioutil" + "strings" "testing" ) @@ -41,7 +42,77 @@ func TestChunk(t *testing.T) { } } +func TestChunkReadMultiple(t *testing.T) { + // Bunch of small chunks, all read together. + { + var b bytes.Buffer + w := newChunkedWriter(&b) + w.Write([]byte("foo")) + w.Write([]byte("bar")) + w.Close() + + r := newChunkedReader(&b) + buf := make([]byte, 10) + n, err := r.Read(buf) + if n != 6 || err != io.EOF { + t.Errorf("Read = %d, %v; want 6, EOF", n, err) + } + buf = buf[:n] + if string(buf) != "foobar" { + t.Errorf("Read = %q; want %q", buf, "foobar") + } + } + + // One big chunk followed by a little chunk, but the small bufio.Reader size + // should prevent the second chunk header from being read. + { + var b bytes.Buffer + w := newChunkedWriter(&b) + // fillBufChunk is 11 bytes + 3 bytes header + 2 bytes footer = 16 bytes, + // the same as the bufio ReaderSize below (the minimum), so even + // though we're going to try to Read with a buffer larger enough to also + // receive "foo", the second chunk header won't be read yet. + const fillBufChunk = "0123456789a" + const shortChunk = "foo" + w.Write([]byte(fillBufChunk)) + w.Write([]byte(shortChunk)) + w.Close() + + r := newChunkedReader(bufio.NewReaderSize(&b, 16)) + buf := make([]byte, len(fillBufChunk)+len(shortChunk)) + n, err := r.Read(buf) + if n != len(fillBufChunk) || err != nil { + t.Errorf("Read = %d, %v; want %d, nil", n, err, len(fillBufChunk)) + } + buf = buf[:n] + if string(buf) != fillBufChunk { + t.Errorf("Read = %q; want %q", buf, fillBufChunk) + } + + n, err = r.Read(buf) + if n != len(shortChunk) || err != io.EOF { + t.Errorf("Read = %d, %v; want %d, EOF", n, err, len(shortChunk)) + } + } + + // And test that we see an EOF chunk, even though our buffer is already full: + { + r := newChunkedReader(bufio.NewReader(strings.NewReader("3\r\nfoo\r\n0\r\n"))) + buf := make([]byte, 3) + n, err := r.Read(buf) + if n != 3 || err != io.EOF { + t.Errorf("Read = %d, %v; want 3, EOF", n, err) + } + if string(buf) != "foo" { + t.Errorf("buf = %q; want foo", buf) + } + } +} + func TestChunkReaderAllocs(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } var buf bytes.Buffer w := newChunkedWriter(&buf) a, b, c := []byte("aaaaaa"), []byte("bbbbbbbbbbbb"), []byte("cccccccccccccccccccccccc") @@ -53,7 +124,7 @@ func TestChunkReaderAllocs(t *testing.T) { readBuf := make([]byte, len(a)+len(b)+len(c)+1) byter := bytes.NewReader(buf.Bytes()) bufr := bufio.NewReader(byter) - mallocs := testing.AllocsPerRun(10, func() { + mallocs := testing.AllocsPerRun(100, func() { byter.Seek(0, 0) bufr.Reset(byter) r := newChunkedReader(bufr) @@ -66,7 +137,7 @@ func TestChunkReaderAllocs(t *testing.T) { } }) if mallocs > 1.5 { - t.Logf("mallocs = %v; want 1", mallocs) + t.Errorf("mallocs = %v; want 1", mallocs) } } diff --git a/src/pkg/net/http/httputil/chunked.go b/src/pkg/net/http/httputil/chunked.go index b66d409515..9632bfd19d 100644 --- a/src/pkg/net/http/httputil/chunked.go +++ b/src/pkg/net/http/httputil/chunked.go @@ -4,15 +4,14 @@ // The wire protocol for HTTP's "chunked" Transfer-Encoding. -// This code is a duplicate of ../chunked.go with these edits: -// s/newChunked/NewChunked/g -// s/package http/package httputil/ +// This code is duplicated in net/http and net/http/httputil. // Please make any changes in both files. package httputil import ( "bufio" + "bytes" "errors" "fmt" "io" @@ -22,13 +21,13 @@ const maxLineLength = 4096 // assumed <= bufio.defaultBufSize var ErrLineTooLong = errors.New("header line too long") -// NewChunkedReader returns a new chunkedReader that translates the data read from r +// newChunkedReader returns a new chunkedReader that translates the data read from r // out of HTTP "chunked" format before returning it. // The chunkedReader returns io.EOF when the final 0-length chunk is read. // -// NewChunkedReader is not needed by normal applications. The http package +// newChunkedReader is not needed by normal applications. The http package // automatically decodes chunking when reading response bodies. -func NewChunkedReader(r io.Reader) io.Reader { +func newChunkedReader(r io.Reader) io.Reader { br, ok := r.(*bufio.Reader) if !ok { br = bufio.NewReader(r) @@ -59,26 +58,45 @@ func (cr *chunkedReader) beginChunk() { } } -func (cr *chunkedReader) Read(b []uint8) (n int, err error) { - if cr.err != nil { - return 0, cr.err +func (cr *chunkedReader) chunkHeaderAvailable() bool { + n := cr.r.Buffered() + if n > 0 { + peek, _ := cr.r.Peek(n) + return bytes.IndexByte(peek, '\n') >= 0 } - if cr.n == 0 { - cr.beginChunk() - if cr.err != nil { - return 0, cr.err + return false +} + +func (cr *chunkedReader) Read(b []uint8) (n int, err error) { + for cr.err == nil { + if cr.n == 0 { + if n > 0 && !cr.chunkHeaderAvailable() { + // We've read enough. Don't potentially block + // reading a new chunk header. + break + } + cr.beginChunk() + continue } - } - if uint64(len(b)) > cr.n { - b = b[0:cr.n] - } - n, cr.err = cr.r.Read(b) - cr.n -= uint64(n) - if cr.n == 0 && cr.err == nil { - // end of chunk (CRLF) - if _, cr.err = io.ReadFull(cr.r, cr.buf[:]); cr.err == nil { - if cr.buf[0] != '\r' || cr.buf[1] != '\n' { - cr.err = errors.New("malformed chunked encoding") + if len(b) == 0 { + break + } + rbuf := b + if uint64(len(rbuf)) > cr.n { + rbuf = rbuf[:cr.n] + } + var n0 int + n0, cr.err = cr.r.Read(rbuf) + n += n0 + b = b[n0:] + cr.n -= uint64(n0) + // If we're at the end of a chunk, read the next two + // bytes to verify they are "\r\n". + if cr.n == 0 && cr.err == nil { + if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err == nil { + if cr.buf[0] != '\r' || cr.buf[1] != '\n' { + cr.err = errors.New("malformed chunked encoding") + } } } } @@ -117,16 +135,16 @@ func isASCIISpace(b byte) bool { return b == ' ' || b == '\t' || b == '\n' || b == '\r' } -// NewChunkedWriter returns a new chunkedWriter that translates writes into HTTP +// newChunkedWriter returns a new chunkedWriter that translates writes into HTTP // "chunked" format before writing them to w. Closing the returned chunkedWriter // sends the final 0-length chunk that marks the end of the stream. // -// NewChunkedWriter is not needed by normal applications. The http +// newChunkedWriter is not needed by normal applications. The http // package adds chunking automatically if handlers don't set a -// Content-Length header. Using NewChunkedWriter inside a handler +// Content-Length header. Using newChunkedWriter inside a handler // would result in double chunking or chunking with a Content-Length // length, both of which are wrong. -func NewChunkedWriter(w io.Writer) io.WriteCloser { +func newChunkedWriter(w io.Writer) io.WriteCloser { return &chunkedWriter{w} } diff --git a/src/pkg/net/http/httputil/chunked_test.go b/src/pkg/net/http/httputil/chunked_test.go index 3fb5fa5265..a7a5774688 100644 --- a/src/pkg/net/http/httputil/chunked_test.go +++ b/src/pkg/net/http/httputil/chunked_test.go @@ -2,9 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// This code is a duplicate of ../chunked_test.go with these edits: -// s/newChunked/NewChunked/g -// s/package http/package httputil/ +// This code is duplicated in net/http and net/http/httputil. // Please make any changes in both files. package httputil @@ -15,13 +13,14 @@ import ( "fmt" "io" "io/ioutil" + "strings" "testing" ) func TestChunk(t *testing.T) { var b bytes.Buffer - w := NewChunkedWriter(&b) + w := newChunkedWriter(&b) const chunk1 = "hello, " const chunk2 = "world! 0123456789abcdef" w.Write([]byte(chunk1)) @@ -32,7 +31,7 @@ func TestChunk(t *testing.T) { t.Fatalf("chunk writer wrote %q; want %q", g, e) } - r := NewChunkedReader(&b) + r := newChunkedReader(&b) data, err := ioutil.ReadAll(r) if err != nil { t.Logf(`data: "%s"`, data) @@ -43,9 +42,79 @@ func TestChunk(t *testing.T) { } } +func TestChunkReadMultiple(t *testing.T) { + // Bunch of small chunks, all read together. + { + var b bytes.Buffer + w := newChunkedWriter(&b) + w.Write([]byte("foo")) + w.Write([]byte("bar")) + w.Close() + + r := newChunkedReader(&b) + buf := make([]byte, 10) + n, err := r.Read(buf) + if n != 6 || err != io.EOF { + t.Errorf("Read = %d, %v; want 6, EOF", n, err) + } + buf = buf[:n] + if string(buf) != "foobar" { + t.Errorf("Read = %q; want %q", buf, "foobar") + } + } + + // One big chunk followed by a little chunk, but the small bufio.Reader size + // should prevent the second chunk header from being read. + { + var b bytes.Buffer + w := newChunkedWriter(&b) + // fillBufChunk is 11 bytes + 3 bytes header + 2 bytes footer = 16 bytes, + // the same as the bufio ReaderSize below (the minimum), so even + // though we're going to try to Read with a buffer larger enough to also + // receive "foo", the second chunk header won't be read yet. + const fillBufChunk = "0123456789a" + const shortChunk = "foo" + w.Write([]byte(fillBufChunk)) + w.Write([]byte(shortChunk)) + w.Close() + + r := newChunkedReader(bufio.NewReaderSize(&b, 16)) + buf := make([]byte, len(fillBufChunk)+len(shortChunk)) + n, err := r.Read(buf) + if n != len(fillBufChunk) || err != nil { + t.Errorf("Read = %d, %v; want %d, nil", n, err, len(fillBufChunk)) + } + buf = buf[:n] + if string(buf) != fillBufChunk { + t.Errorf("Read = %q; want %q", buf, fillBufChunk) + } + + n, err = r.Read(buf) + if n != len(shortChunk) || err != io.EOF { + t.Errorf("Read = %d, %v; want %d, EOF", n, err, len(shortChunk)) + } + } + + // And test that we see an EOF chunk, even though our buffer is already full: + { + r := newChunkedReader(bufio.NewReader(strings.NewReader("3\r\nfoo\r\n0\r\n"))) + buf := make([]byte, 3) + n, err := r.Read(buf) + if n != 3 || err != io.EOF { + t.Errorf("Read = %d, %v; want 3, EOF", n, err) + } + if string(buf) != "foo" { + t.Errorf("buf = %q; want foo", buf) + } + } +} + func TestChunkReaderAllocs(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } var buf bytes.Buffer - w := NewChunkedWriter(&buf) + w := newChunkedWriter(&buf) a, b, c := []byte("aaaaaa"), []byte("bbbbbbbbbbbb"), []byte("cccccccccccccccccccccccc") w.Write(a) w.Write(b) @@ -55,10 +124,10 @@ func TestChunkReaderAllocs(t *testing.T) { readBuf := make([]byte, len(a)+len(b)+len(c)+1) byter := bytes.NewReader(buf.Bytes()) bufr := bufio.NewReader(byter) - mallocs := testing.AllocsPerRun(10, func() { + mallocs := testing.AllocsPerRun(100, func() { byter.Seek(0, 0) bufr.Reset(byter) - r := NewChunkedReader(bufr) + r := newChunkedReader(bufr) n, err := io.ReadFull(r, readBuf) if n != len(readBuf)-1 { t.Fatalf("read %d bytes; want %d", n, len(readBuf)-1) @@ -68,7 +137,7 @@ func TestChunkReaderAllocs(t *testing.T) { } }) if mallocs > 1.5 { - t.Logf("mallocs = %v; want 1", mallocs) + t.Errorf("mallocs = %v; want 1", mallocs) } } diff --git a/src/pkg/net/http/httputil/httputil.go b/src/pkg/net/http/httputil/httputil.go new file mode 100644 index 0000000000..74fb6c6556 --- /dev/null +++ b/src/pkg/net/http/httputil/httputil.go @@ -0,0 +1,32 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package httputil provides HTTP utility functions, complementing the +// more common ones in the net/http package. +package httputil + +import "io" + +// NewChunkedReader returns a new chunkedReader that translates the data read from r +// out of HTTP "chunked" format before returning it. +// The chunkedReader returns io.EOF when the final 0-length chunk is read. +// +// NewChunkedReader is not needed by normal applications. The http package +// automatically decodes chunking when reading response bodies. +func NewChunkedReader(r io.Reader) io.Reader { + return newChunkedReader(r) +} + +// NewChunkedWriter returns a new chunkedWriter that translates writes into HTTP +// "chunked" format before writing them to w. Closing the returned chunkedWriter +// sends the final 0-length chunk that marks the end of the stream. +// +// NewChunkedWriter is not needed by normal applications. The http +// package adds chunking automatically if handlers don't set a +// Content-Length header. Using NewChunkedWriter inside a handler +// would result in double chunking or chunking with a Content-Length +// length, both of which are wrong. +func NewChunkedWriter(w io.Writer) io.WriteCloser { + return newChunkedWriter(w) +} diff --git a/src/pkg/net/http/httputil/persist.go b/src/pkg/net/http/httputil/persist.go index 507938acac..86d23e0370 100644 --- a/src/pkg/net/http/httputil/persist.go +++ b/src/pkg/net/http/httputil/persist.go @@ -2,8 +2,6 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package httputil provides HTTP utility functions, complementing the -// more common ones in the net/http package. package httputil import ( diff --git a/src/pkg/net/http/transfer_test.go b/src/pkg/net/http/transfer_test.go index fb5ef37a0f..48cd540b9f 100644 --- a/src/pkg/net/http/transfer_test.go +++ b/src/pkg/net/http/transfer_test.go @@ -6,6 +6,7 @@ package http import ( "bufio" + "io" "strings" "testing" ) @@ -35,3 +36,29 @@ func TestBodyReadBadTrailer(t *testing.T) { t.Errorf("final Read was successful (%q), expected error from trailer read", got) } } + +func TestFinalChunkedBodyReadEOF(t *testing.T) { + res, err := ReadResponse(bufio.NewReader(strings.NewReader( + "HTTP/1.1 200 OK\r\n"+ + "Transfer-Encoding: chunked\r\n"+ + "\r\n"+ + "0a\r\n"+ + "Body here\n\r\n"+ + "09\r\n"+ + "continued\r\n"+ + "0\r\n"+ + "\r\n")), nil) + if err != nil { + t.Fatal(err) + } + want := "Body here\ncontinued" + buf := make([]byte, len(want)) + n, err := res.Body.Read(buf) + if n != len(want) || err != io.EOF { + t.Logf("body = %#v", res.Body) + t.Errorf("Read = %v, %v; want %d, EOF", n, err, len(want)) + } + if string(buf) != want { + t.Errorf("buf = %q; want %q", buf, want) + } +} diff --git a/src/pkg/net/http/transport_test.go b/src/pkg/net/http/transport_test.go index 21a1f114d3..a9d05fa09a 100644 --- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -277,41 +277,50 @@ func TestTransportReadToEndReusesConn(t *testing.T) { defer afterTest(t) const msg = "foobar" - addrSeen := make(map[string]int) + var addrSeen map[string]int ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { addrSeen[r.RemoteAddr]++ - w.Header().Set("Content-Type", strconv.Itoa(len(msg))) - w.WriteHeader(200) + if r.URL.Path == "/chunked/" { + w.WriteHeader(200) + w.(http.Flusher).Flush() + } else { + w.Header().Set("Content-Type", strconv.Itoa(len(msg))) + w.WriteHeader(200) + } w.Write([]byte(msg)) })) defer ts.Close() buf := make([]byte, len(msg)) - for i := 0; i < 3; i++ { - res, err := http.Get(ts.URL) - if err != nil { - t.Errorf("Get: %v", err) - continue - } - // We want to close this body eventually (before the - // defer afterTest at top runs), but not before the - // len(addrSeen) check at the bottom of this test, - // since Closing this early in the loop would risk - // making connections be re-used for the wrong reason. - defer res.Body.Close() + for pi, path := range []string{"/content-length/", "/chunked/"} { + wantLen := []int{len(msg), -1}[pi] + addrSeen = make(map[string]int) + for i := 0; i < 3; i++ { + res, err := http.Get(ts.URL + path) + if err != nil { + t.Errorf("Get %s: %v", path, err) + continue + } + // We want to close this body eventually (before the + // defer afterTest at top runs), but not before the + // len(addrSeen) check at the bottom of this test, + // since Closing this early in the loop would risk + // making connections be re-used for the wrong reason. + defer res.Body.Close() - if res.ContentLength != int64(len(msg)) { - t.Errorf("res.ContentLength = %d; want %d", res.ContentLength, len(msg)) + if res.ContentLength != int64(wantLen) { + t.Errorf("%s res.ContentLength = %d; want %d", path, res.ContentLength, wantLen) + } + n, err := res.Body.Read(buf) + if n != len(msg) || err != io.EOF { + t.Errorf("%s Read = %v, %v; want %d, EOF", path, n, err, len(msg)) + } } - n, err := res.Body.Read(buf) - if n != len(msg) || err != io.EOF { - t.Errorf("Read = %v, %v; want 6, EOF", n, err) + if len(addrSeen) != 1 { + t.Errorf("for %s, server saw %d distinct client addresses; want 1", path, len(addrSeen)) } } - if len(addrSeen) != 1 { - t.Errorf("server saw %d distinct client addresses; want 1", len(addrSeen)) - } } func TestTransportMaxPerHostIdleConns(t *testing.T) { -- 2.48.1