]> Cypherpunks repositories - gostls13.git/commitdiff
compress/flate: make Reader.Read return io.EOF eagerly
authorJoe Tsai <joetsai@digital-static.net>
Wed, 30 Mar 2016 05:04:03 +0000 (22:04 -0700)
committerBrad Fitzpatrick <bradfitz@golang.org>
Thu, 31 Mar 2016 22:19:12 +0000 (22:19 +0000)
Rather than checking the block final bit on the next invocation
of nextBlock, we check it at the termination of the current block.
This ensures that we return (n, io.EOF) instead of (0, io.EOF)
more frequently for most streams.

However, there are certain situations where an eager io.EOF is not done:
1) We previously returned from Read because the write buffer of the internal
dictionary was full, and it just so happens that there is no more data
remaining in the stream.
2) There exists a [non-final, empty, raw block] after all blocks that
actually contain uncompressed data. We cannot return io.EOF eagerly here
since it would break flushing semantics.

Both situations happen infrequently, but it is still important to note that
this change does *not* guarantee that flate will *always* return (n, io.EOF).

Furthermore, this CL makes no changes to the pattern of ReadByte calls
to the underlying io.ByteReader.

Below is the motivation for this change, pulling the text from
@bradfitz's CL/21290:

net/http and other things work better when io.Reader implementations
return (n, io.EOF) at the end, instead of (n, nil) followed by (0,
io.EOF). Both are legal, but the standard library has been moving
towards n+io.EOF.

An investigation of net/http connection re-use in
https://github.com/google/go-github/pull/317 revealed that with gzip
compression + http/1.1 chunking, the net/http package was not
automatically reusing the underlying TCP connections when the final
EOF bytes were already read off the wire. The net/http package only
reuses the connection if the underlying Readers (many of them nested
in this case) all eagerly return io.EOF.

Previous related CLs:
    https://golang.org/cl/76400046 - tls.Reader
    https://golang.org/cl/58240043 - http chunked reader

In addition to net/http, this behavior also helps things like
ioutil.ReadAll (see comments about performance improvements in
https://codereview.appspot.com/49570044)

Updates #14867
Updates google/go-github#317

Change-Id: I637c45552efb561d34b13ed918b73c660f668378
Reviewed-on: https://go-review.googlesource.com/21302
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
src/compress/flate/flate_test.go
src/compress/flate/inflate.go

index 341d807131351c6ea2a39969537b5e5d609ce687..83c20498cc490d547add9c401f0c49619cc6a1b2 100644 (file)
@@ -272,3 +272,81 @@ func TestTruncatedStreams(t *testing.T) {
                }
        }
 }
+
+// Verify that flate.Reader.Read returns (n, io.EOF) instead
+// of (n, nil) + (0, io.EOF) when possible.
+//
+// This helps net/http.Transport reuse HTTP/1 connections more
+// aggressively.
+//
+// See https://github.com/google/go-github/pull/317 for background.
+func TestReaderEarlyEOF(t *testing.T) {
+       testSizes := []int{
+               1, 2, 3, 4, 5, 6, 7, 8,
+               100, 1000, 10000, 100000,
+               128, 1024, 16384, 131072,
+
+               // Testing multiples of windowSize triggers the case
+               // where Read will fail to return an early io.EOF.
+               windowSize * 1, windowSize * 2, windowSize * 3,
+       }
+
+       var maxSize int
+       for _, n := range testSizes {
+               if maxSize < n {
+                       maxSize = n
+               }
+       }
+
+       readBuf := make([]byte, 40)
+       data := make([]byte, maxSize)
+       for i := range data {
+               data[i] = byte(i)
+       }
+
+       for _, sz := range testSizes {
+               if testing.Short() && sz > windowSize {
+                       continue
+               }
+               for _, flush := range []bool{true, false} {
+                       earlyEOF := true // Do we expect early io.EOF?
+
+                       var buf bytes.Buffer
+                       w, _ := NewWriter(&buf, 5)
+                       w.Write(data[:sz])
+                       if flush {
+                               // If a Flush occurs after all the actual data, the flushing
+                               // semantics dictate that we will observe a (0, io.EOF) since
+                               // Read must return data before it knows that the stream ended.
+                               w.Flush()
+                               earlyEOF = false
+                       }
+                       w.Close()
+
+                       r := NewReader(&buf)
+                       for {
+                               n, err := r.Read(readBuf)
+                               if err == io.EOF {
+                                       // If the availWrite == windowSize, then that means that the
+                                       // previous Read returned because the write buffer was full
+                                       // and it just so happened that the stream had no more data.
+                                       // This situation is rare, but unavoidable.
+                                       if r.(*decompressor).dict.availWrite() == windowSize {
+                                               earlyEOF = false
+                                       }
+
+                                       if n == 0 && earlyEOF {
+                                               t.Errorf("On size:%d flush:%v, Read() = (0, io.EOF), want (n, io.EOF)", sz, flush)
+                                       }
+                                       if n != 0 && !earlyEOF {
+                                               t.Errorf("On size:%d flush:%v, Read() = (%d, io.EOF), want (0, io.EOF)", sz, flush, n)
+                                       }
+                                       break
+                               }
+                               if err != nil {
+                                       t.Fatal(err)
+                               }
+                       }
+               }
+       }
+}
index 6b0657b79914e8b9d5540f39b55b80e127bf9d49..d5f55eab342017a9b6ca6ccbb7d108870ab2e0f9 100644 (file)
@@ -299,15 +299,6 @@ type decompressor struct {
 }
 
 func (f *decompressor) nextBlock() {
-       if f.final {
-               if f.dict.availRead() > 0 {
-                       f.toRead = f.dict.readFlush()
-                       f.step = (*decompressor).nextBlock
-                       return
-               }
-               f.err = io.EOF
-               return
-       }
        for f.nb < 1+2 {
                if f.err = f.moreBits(); f.err != nil {
                        return
@@ -345,6 +336,9 @@ func (f *decompressor) Read(b []byte) (int, error) {
                if len(f.toRead) > 0 {
                        n := copy(b, f.toRead)
                        f.toRead = f.toRead[n:]
+                       if len(f.toRead) == 0 {
+                               return n, f.err
+                       }
                        return n, nil
                }
                if f.err != nil {
@@ -512,8 +506,7 @@ readLiteral:
                        }
                        goto readLiteral
                case v == 256:
-                       // Done with huffman block; read next block.
-                       f.step = (*decompressor).nextBlock
+                       f.finishBlock()
                        return
                // otherwise, reference to older data
                case v < 265:
@@ -648,7 +641,7 @@ func (f *decompressor) dataBlock() {
 
        if n == 0 {
                f.toRead = f.dict.readFlush()
-               f.step = (*decompressor).nextBlock
+               f.finishBlock()
                return
        }
 
@@ -681,6 +674,16 @@ func (f *decompressor) copyData() {
                f.step = (*decompressor).copyData
                return
        }
+       f.finishBlock()
+}
+
+func (f *decompressor) finishBlock() {
+       if f.final {
+               if f.dict.availRead() > 0 {
+                       f.toRead = f.dict.readFlush()
+               }
+               f.err = io.EOF
+       }
        f.step = (*decompressor).nextBlock
 }