From 861c90c907db1129dcd1540eecd3c66b6309db7a Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Wed, 3 Sep 2025 10:09:08 +0000 Subject: [PATCH] net/http: pool transport gzip readers MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit goos: linux goarch: amd64 pkg: net/http │ HEAD~1 │ HEAD │ │ sec/op │ sec/op vs base │ ClientGzip-8 621.0µ ± 2% 616.3µ ± 10% ~ (p=0.971 n=10) │ HEAD~1 │ HEAD │ │ B/op │ B/op vs base │ ClientGzip-8 49.765Ki ± 0% 9.514Ki ± 2% -80.88% (p=0.000 n=10) │ HEAD~1 │ HEAD │ │ allocs/op │ allocs/op vs base │ ClientGzip-8 57.00 ± 0% 52.00 ± 0% -8.77% (p=0.000 n=10) Allocation saving comes from absent compress/flate.(*dictDecoder).init This change also improves concurrent body read detection by returning an explicit error. Updates #61353 Change-Id: I380acfca912dc009b3b9c8283e27b3526cedd546 GitHub-Last-Rev: df12f6a48af4854ba686fe431a9aeb6d9ba3c303 GitHub-Pull-Request: golang/go#61390 Reviewed-on: https://go-review.googlesource.com/c/go/+/510255 Reviewed-by: Sean Liao Auto-Submit: Michael Pratt Reviewed-by: Michael Pratt LUCI-TryBot-Result: Go LUCI Reviewed-by: Cherry Mui --- src/net/http/serve_test.go | 102 +++++++++++++++++++++++++++---------- src/net/http/transport.go | 89 +++++++++++++++++++++++++++----- 2 files changed, 151 insertions(+), 40 deletions(-) diff --git a/src/net/http/serve_test.go b/src/net/http/serve_test.go index 7e3e490af3..aee6288f3b 100644 --- a/src/net/http/serve_test.go +++ b/src/net/http/serve_test.go @@ -12,6 +12,7 @@ import ( "compress/gzip" "compress/zlib" "context" + crand "crypto/rand" "crypto/tls" "crypto/x509" "encoding/json" @@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode) func BenchmarkServer(b *testing.B) { b.ReportAllocs() // Child process mode; - if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" { - n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N")) + if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" { + n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N")) if err != nil { panic(err) } @@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) { cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$") cmd.Env = append([]string{ - fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N), - fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL), + fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N), + fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL), }, os.Environ()...) out, err := cmd.CombinedOutput() if err != nil { @@ -5338,30 +5339,54 @@ func getNoBody(urlStr string) (*Response, error) { // A benchmark for profiling the client without the HTTP server code. // The server code runs in a subprocess. func BenchmarkClient(b *testing.B) { + var data = []byte("Hello world.\n") + + url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Write(data) + })) + + // Do b.N requests to the server. + b.StartTimer() + for i := 0; i < b.N; i++ { + res, err := Get(url) + if err != nil { + b.Fatalf("Get: %v", err) + } + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + b.Fatalf("ReadAll: %v", err) + } + if !bytes.Equal(body, data) { + b.Fatalf("Got body: %q", body) + } + } + b.StopTimer() +} + +func startClientBenchmarkServer(b *testing.B, handler Handler) string { b.ReportAllocs() b.StopTimer() - defer afterTest(b) - var data = []byte("Hello world.\n") - if server := os.Getenv("TEST_BENCH_SERVER"); server != "" { + if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" { // Server process mode. - port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user + port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user if port == "" { port = "0" } ln, err := net.Listen("tcp", "localhost:"+port) if err != nil { - fmt.Fprintln(os.Stderr, err.Error()) - os.Exit(1) + log.Fatal(err) } fmt.Println(ln.Addr().String()) + HandleFunc("/", func(w ResponseWriter, r *Request) { r.ParseForm() if r.Form.Get("stop") != "" { os.Exit(0) } - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Write(data) + handler.ServeHTTP(w, r) }) var srv Server log.Fatal(srv.Serve(ln)) @@ -5369,8 +5394,8 @@ func BenchmarkClient(b *testing.B) { // Start server process. ctx, cancel := context.WithCancel(context.Background()) - cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$") - cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes") + cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$") + cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes") cmd.Stderr = os.Stderr stdout, err := cmd.StdoutPipe() if err != nil { @@ -5385,10 +5410,6 @@ func BenchmarkClient(b *testing.B) { done <- cmd.Wait() close(done) }() - defer func() { - cancel() - <-done - }() // Wait for the server in the child process to respond and tell us // its listening address, once it's started listening: @@ -5401,6 +5422,39 @@ func BenchmarkClient(b *testing.B) { b.Fatalf("initial probe of child process failed: %v", err) } + // Instruct server process to stop. + b.Cleanup(func() { + getNoBody(url + "?stop=yes") + if err := <-done; err != nil { + b.Fatalf("subprocess failed: %v", err) + } + + cancel() + <-done + + afterTest(b) + }) + + return url +} + +func BenchmarkClientGzip(b *testing.B) { + const responseSize = 1024 * 1024 + + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil { + b.Fatal(err) + } + gz.Close() + + data := buf.Bytes() + + url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) { + w.Header().Set("Content-Encoding", "gzip") + w.Write(data) + })) + // Do b.N requests to the server. b.StartTimer() for i := 0; i < b.N; i++ { @@ -5408,22 +5462,16 @@ func BenchmarkClient(b *testing.B) { if err != nil { b.Fatalf("Get: %v", err) } - body, err := io.ReadAll(res.Body) + n, err := io.Copy(io.Discard, res.Body) res.Body.Close() if err != nil { b.Fatalf("ReadAll: %v", err) } - if !bytes.Equal(body, data) { - b.Fatalf("Got body: %q", body) + if n != responseSize { + b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n) } } b.StopTimer() - - // Instruct server process to stop. - getNoBody(url + "?stop=yes") - if err := <-done; err != nil { - b.Fatalf("subprocess failed: %v", err) - } } func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) { diff --git a/src/net/http/transport.go b/src/net/http/transport.go index 572c16a6d8..c5b1a87c5c 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -11,6 +11,7 @@ package http import ( "bufio" + "compress/flate" "compress/gzip" "container/list" "context" @@ -2988,6 +2989,7 @@ type bodyEOFSignal struct { } var errReadOnClosedResBody = errors.New("http: read on closed response body") +var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body") func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { es.mu.Lock() @@ -3037,37 +3039,98 @@ func (es *bodyEOFSignal) condfn(err error) error { } // gzipReader wraps a response body so it can lazily -// call gzip.NewReader on the first call to Read +// get gzip.Reader from the pool on the first call to Read. +// After Close is called it puts gzip.Reader to the pool immediately +// if there is no Read in progress or later when Read completes. type gzipReader struct { _ incomparable body *bodyEOFSignal // underlying HTTP/1 response body framing - zr *gzip.Reader // lazily-initialized gzip reader - zerr error // any error from gzip.NewReader; sticky + mu sync.Mutex // guards zr and zerr + zr *gzip.Reader + zerr error } -func (gz *gzipReader) Read(p []byte) (n int, err error) { +type eofReader struct{} + +func (eofReader) Read([]byte) (int, error) { return 0, io.EOF } +func (eofReader) ReadByte() (byte, error) { return 0, io.EOF } + +var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }} + +// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r. +func gzipPoolGet(r io.Reader) (*gzip.Reader, error) { + zr := gzipPool.Get().(*gzip.Reader) + if err := zr.Reset(r); err != nil { + gzipPoolPut(zr) + return nil, err + } + return zr, nil +} + +// gzipPoolPut puts a gzip.Reader back into the pool. +func gzipPoolPut(zr *gzip.Reader) { + // Reset will allocate bufio.Reader if we pass it anything + // other than a flate.Reader, so ensure that it's getting one. + var r flate.Reader = eofReader{} + zr.Reset(r) + gzipPool.Put(zr) +} + +// acquire returns a gzip.Reader for reading response body. +// The reader must be released after use. +func (gz *gzipReader) acquire() (*gzip.Reader, error) { + gz.mu.Lock() + defer gz.mu.Unlock() + if gz.zerr != nil { + return nil, gz.zerr + } if gz.zr == nil { - if gz.zerr == nil { - gz.zr, gz.zerr = gzip.NewReader(gz.body) - } + gz.zr, gz.zerr = gzipPoolGet(gz.body) if gz.zerr != nil { - return 0, gz.zerr + return nil, gz.zerr } } + ret := gz.zr + gz.zr, gz.zerr = nil, errConcurrentReadOnResBody + return ret, nil +} + +// release returns the gzip.Reader to the pool if Close was called during Read. +func (gz *gzipReader) release(zr *gzip.Reader) { + gz.mu.Lock() + defer gz.mu.Unlock() + if gz.zerr == errConcurrentReadOnResBody { + gz.zr, gz.zerr = zr, nil + } else { // errReadOnClosedResBody + gzipPoolPut(zr) + } +} - gz.body.mu.Lock() - if gz.body.closed { - err = errReadOnClosedResBody +// close returns the gzip.Reader to the pool immediately or +// signals release to do so after Read completes. +func (gz *gzipReader) close() { + gz.mu.Lock() + defer gz.mu.Unlock() + if gz.zerr == nil && gz.zr != nil { + gzipPoolPut(gz.zr) + gz.zr = nil } - gz.body.mu.Unlock() + gz.zerr = errReadOnClosedResBody +} +func (gz *gzipReader) Read(p []byte) (n int, err error) { + zr, err := gz.acquire() if err != nil { return 0, err } - return gz.zr.Read(p) + defer gz.release(zr) + + return zr.Read(p) } func (gz *gzipReader) Close() error { + gz.close() + return gz.body.Close() } -- 2.52.0