]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: pool transport gzip readers
authorAlexander Yastrebov <yastrebov.alex@gmail.com>
Wed, 3 Sep 2025 10:09:08 +0000 (10:09 +0000)
committerGopher Robot <gobot@golang.org>
Sat, 6 Sep 2025 10:00:56 +0000 (03:00 -0700)
goos: linux
goarch: amd64
pkg: net/http
             │   HEAD~1    │              HEAD              │
             │   sec/op    │    sec/op     vs base          │
ClientGzip-8   621.0µ ± 2%   616.3µ ± 10%  ~ (p=0.971 n=10)

             │    HEAD~1     │                 HEAD                 │
             │     B/op      │     B/op      vs base                │
ClientGzip-8   49.765Ki ± 0%   9.514Ki ± 2%  -80.88% (p=0.000 n=10)

             │   HEAD~1   │               HEAD                │
             │ allocs/op  │ allocs/op   vs base               │
ClientGzip-8   57.00 ± 0%   52.00 ± 0%  -8.77% (p=0.000 n=10)

Allocation saving comes from absent compress/flate.(*dictDecoder).init

This change also improves concurrent body read detection by returning an explicit error.

Updates #61353

Change-Id: I380acfca912dc009b3b9c8283e27b3526cedd546
GitHub-Last-Rev: df12f6a48af4854ba686fe431a9aeb6d9ba3c303
GitHub-Pull-Request: golang/go#61390
Reviewed-on: https://go-review.googlesource.com/c/go/+/510255
Reviewed-by: Sean Liao <sean@liao.dev>
Auto-Submit: Michael Pratt <mpratt@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Cherry Mui <cherryyz@google.com>
src/net/http/serve_test.go
src/net/http/transport.go

index 7e3e490af32679c5730f19f03b8860bc5cedd221..aee6288f3b253b746f1a748caee246cb3a8aee86 100644 (file)
@@ -12,6 +12,7 @@ import (
        "compress/gzip"
        "compress/zlib"
        "context"
+       crand "crypto/rand"
        "crypto/tls"
        "crypto/x509"
        "encoding/json"
@@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode)
 func BenchmarkServer(b *testing.B) {
        b.ReportAllocs()
        // Child process mode;
-       if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" {
-               n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N"))
+       if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" {
+               n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N"))
                if err != nil {
                        panic(err)
                }
@@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) {
 
        cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
        cmd.Env = append([]string{
-               fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N),
-               fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL),
+               fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N),
+               fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL),
        }, os.Environ()...)
        out, err := cmd.CombinedOutput()
        if err != nil {
@@ -5338,30 +5339,54 @@ func getNoBody(urlStr string) (*Response, error) {
 // A benchmark for profiling the client without the HTTP server code.
 // The server code runs in a subprocess.
 func BenchmarkClient(b *testing.B) {
+       var data = []byte("Hello world.\n")
+
+       url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
+               w.Header().Set("Content-Type", "text/html; charset=utf-8")
+               w.Write(data)
+       }))
+
+       // Do b.N requests to the server.
+       b.StartTimer()
+       for i := 0; i < b.N; i++ {
+               res, err := Get(url)
+               if err != nil {
+                       b.Fatalf("Get: %v", err)
+               }
+               body, err := io.ReadAll(res.Body)
+               res.Body.Close()
+               if err != nil {
+                       b.Fatalf("ReadAll: %v", err)
+               }
+               if !bytes.Equal(body, data) {
+                       b.Fatalf("Got body: %q", body)
+               }
+       }
+       b.StopTimer()
+}
+
+func startClientBenchmarkServer(b *testing.B, handler Handler) string {
        b.ReportAllocs()
        b.StopTimer()
-       defer afterTest(b)
 
-       var data = []byte("Hello world.\n")
-       if server := os.Getenv("TEST_BENCH_SERVER"); server != "" {
+       if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" {
                // Server process mode.
-               port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user
+               port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user
                if port == "" {
                        port = "0"
                }
                ln, err := net.Listen("tcp", "localhost:"+port)
                if err != nil {
-                       fmt.Fprintln(os.Stderr, err.Error())
-                       os.Exit(1)
+                       log.Fatal(err)
                }
                fmt.Println(ln.Addr().String())
+
                HandleFunc("/", func(w ResponseWriter, r *Request) {
                        r.ParseForm()
                        if r.Form.Get("stop") != "" {
                                os.Exit(0)
                        }
-                       w.Header().Set("Content-Type", "text/html; charset=utf-8")
-                       w.Write(data)
+                       handler.ServeHTTP(w, r)
                })
                var srv Server
                log.Fatal(srv.Serve(ln))
@@ -5369,8 +5394,8 @@ func BenchmarkClient(b *testing.B) {
 
        // Start server process.
        ctx, cancel := context.WithCancel(context.Background())
-       cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$")
-       cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes")
+       cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$")
+       cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes")
        cmd.Stderr = os.Stderr
        stdout, err := cmd.StdoutPipe()
        if err != nil {
@@ -5385,10 +5410,6 @@ func BenchmarkClient(b *testing.B) {
                done <- cmd.Wait()
                close(done)
        }()
-       defer func() {
-               cancel()
-               <-done
-       }()
 
        // Wait for the server in the child process to respond and tell us
        // its listening address, once it's started listening:
@@ -5401,6 +5422,39 @@ func BenchmarkClient(b *testing.B) {
                b.Fatalf("initial probe of child process failed: %v", err)
        }
 
+       // Instruct server process to stop.
+       b.Cleanup(func() {
+               getNoBody(url + "?stop=yes")
+               if err := <-done; err != nil {
+                       b.Fatalf("subprocess failed: %v", err)
+               }
+
+               cancel()
+               <-done
+
+               afterTest(b)
+       })
+
+       return url
+}
+
+func BenchmarkClientGzip(b *testing.B) {
+       const responseSize = 1024 * 1024
+
+       var buf bytes.Buffer
+       gz := gzip.NewWriter(&buf)
+       if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
+               b.Fatal(err)
+       }
+       gz.Close()
+
+       data := buf.Bytes()
+
+       url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
+               w.Header().Set("Content-Encoding", "gzip")
+               w.Write(data)
+       }))
+
        // Do b.N requests to the server.
        b.StartTimer()
        for i := 0; i < b.N; i++ {
@@ -5408,22 +5462,16 @@ func BenchmarkClient(b *testing.B) {
                if err != nil {
                        b.Fatalf("Get: %v", err)
                }
-               body, err := io.ReadAll(res.Body)
+               n, err := io.Copy(io.Discard, res.Body)
                res.Body.Close()
                if err != nil {
                        b.Fatalf("ReadAll: %v", err)
                }
-               if !bytes.Equal(body, data) {
-                       b.Fatalf("Got body: %q", body)
+               if n != responseSize {
+                       b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n)
                }
        }
        b.StopTimer()
-
-       // Instruct server process to stop.
-       getNoBody(url + "?stop=yes")
-       if err := <-done; err != nil {
-               b.Fatalf("subprocess failed: %v", err)
-       }
 }
 
 func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {
index 572c16a6d838d828b3809a683a0864e59659df97..c5b1a87c5cb112cf4a0a79b5b4133e57fb20aea5 100644 (file)
@@ -11,6 +11,7 @@ package http
 
 import (
        "bufio"
+       "compress/flate"
        "compress/gzip"
        "container/list"
        "context"
@@ -2988,6 +2989,7 @@ type bodyEOFSignal struct {
 }
 
 var errReadOnClosedResBody = errors.New("http: read on closed response body")
+var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
 
 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
        es.mu.Lock()
@@ -3037,37 +3039,98 @@ func (es *bodyEOFSignal) condfn(err error) error {
 }
 
 // gzipReader wraps a response body so it can lazily
-// call gzip.NewReader on the first call to Read
+// get gzip.Reader from the pool on the first call to Read.
+// After Close is called it puts gzip.Reader to the pool immediately
+// if there is no Read in progress or later when Read completes.
 type gzipReader struct {
        _    incomparable
        body *bodyEOFSignal // underlying HTTP/1 response body framing
-       zr   *gzip.Reader   // lazily-initialized gzip reader
-       zerr error          // any error from gzip.NewReader; sticky
+       mu   sync.Mutex     // guards zr and zerr
+       zr   *gzip.Reader
+       zerr error
 }
 
-func (gz *gzipReader) Read(p []byte) (n int, err error) {
+type eofReader struct{}
+
+func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
+func (eofReader) ReadByte() (byte, error)  { return 0, io.EOF }
+
+var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
+
+// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
+func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
+       zr := gzipPool.Get().(*gzip.Reader)
+       if err := zr.Reset(r); err != nil {
+               gzipPoolPut(zr)
+               return nil, err
+       }
+       return zr, nil
+}
+
+// gzipPoolPut puts a gzip.Reader back into the pool.
+func gzipPoolPut(zr *gzip.Reader) {
+       // Reset will allocate bufio.Reader if we pass it anything
+       // other than a flate.Reader, so ensure that it's getting one.
+       var r flate.Reader = eofReader{}
+       zr.Reset(r)
+       gzipPool.Put(zr)
+}
+
+// acquire returns a gzip.Reader for reading response body.
+// The reader must be released after use.
+func (gz *gzipReader) acquire() (*gzip.Reader, error) {
+       gz.mu.Lock()
+       defer gz.mu.Unlock()
+       if gz.zerr != nil {
+               return nil, gz.zerr
+       }
        if gz.zr == nil {
-               if gz.zerr == nil {
-                       gz.zr, gz.zerr = gzip.NewReader(gz.body)
-               }
+               gz.zr, gz.zerr = gzipPoolGet(gz.body)
                if gz.zerr != nil {
-                       return 0, gz.zerr
+                       return nil, gz.zerr
                }
        }
+       ret := gz.zr
+       gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
+       return ret, nil
+}
+
+// release returns the gzip.Reader to the pool if Close was called during Read.
+func (gz *gzipReader) release(zr *gzip.Reader) {
+       gz.mu.Lock()
+       defer gz.mu.Unlock()
+       if gz.zerr == errConcurrentReadOnResBody {
+               gz.zr, gz.zerr = zr, nil
+       } else { // errReadOnClosedResBody
+               gzipPoolPut(zr)
+       }
+}
 
-       gz.body.mu.Lock()
-       if gz.body.closed {
-               err = errReadOnClosedResBody
+// close returns the gzip.Reader to the pool immediately or
+// signals release to do so after Read completes.
+func (gz *gzipReader) close() {
+       gz.mu.Lock()
+       defer gz.mu.Unlock()
+       if gz.zerr == nil && gz.zr != nil {
+               gzipPoolPut(gz.zr)
+               gz.zr = nil
        }
-       gz.body.mu.Unlock()
+       gz.zerr = errReadOnClosedResBody
+}
 
+func (gz *gzipReader) Read(p []byte) (n int, err error) {
+       zr, err := gz.acquire()
        if err != nil {
                return 0, err
        }
-       return gz.zr.Read(p)
+       defer gz.release(zr)
+
+       return zr.Read(p)
 }
 
 func (gz *gzipReader) Close() error {
+       gz.close()
+
        return gz.body.Close()
 }