"compress/gzip"
"compress/zlib"
"context"
+ crand "crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
func BenchmarkServer(b *testing.B) {
b.ReportAllocs()
// Child process mode;
- if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" {
- n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N"))
+ if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" {
+ n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N"))
if err != nil {
panic(err)
}
cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
cmd.Env = append([]string{
- fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N),
- fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL),
+ fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N),
+ fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL),
}, os.Environ()...)
out, err := cmd.CombinedOutput()
if err != nil {
// A benchmark for profiling the client without the HTTP server code.
// The server code runs in a subprocess.
func BenchmarkClient(b *testing.B) {
+ var data = []byte("Hello world.\n")
+
+ url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ w.Write(data)
+ }))
+
+ // Do b.N requests to the server.
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ res, err := Get(url)
+ if err != nil {
+ b.Fatalf("Get: %v", err)
+ }
+ body, err := io.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ b.Fatalf("ReadAll: %v", err)
+ }
+ if !bytes.Equal(body, data) {
+ b.Fatalf("Got body: %q", body)
+ }
+ }
+ b.StopTimer()
+}
+
+func startClientBenchmarkServer(b *testing.B, handler Handler) string {
b.ReportAllocs()
b.StopTimer()
- defer afterTest(b)
- var data = []byte("Hello world.\n")
- if server := os.Getenv("TEST_BENCH_SERVER"); server != "" {
+ if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" {
// Server process mode.
- port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user
+ port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user
if port == "" {
port = "0"
}
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
- fmt.Fprintln(os.Stderr, err.Error())
- os.Exit(1)
+ log.Fatal(err)
}
fmt.Println(ln.Addr().String())
+
HandleFunc("/", func(w ResponseWriter, r *Request) {
r.ParseForm()
if r.Form.Get("stop") != "" {
os.Exit(0)
}
- w.Header().Set("Content-Type", "text/html; charset=utf-8")
- w.Write(data)
+ handler.ServeHTTP(w, r)
})
var srv Server
log.Fatal(srv.Serve(ln))
// Start server process.
ctx, cancel := context.WithCancel(context.Background())
- cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$")
- cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes")
+ cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$")
+ cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes")
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
done <- cmd.Wait()
close(done)
}()
- defer func() {
- cancel()
- <-done
- }()
// Wait for the server in the child process to respond and tell us
// its listening address, once it's started listening:
b.Fatalf("initial probe of child process failed: %v", err)
}
+ // Instruct server process to stop.
+ b.Cleanup(func() {
+ getNoBody(url + "?stop=yes")
+ if err := <-done; err != nil {
+ b.Fatalf("subprocess failed: %v", err)
+ }
+
+ cancel()
+ <-done
+
+ afterTest(b)
+ })
+
+ return url
+}
+
+func BenchmarkClientGzip(b *testing.B) {
+ const responseSize = 1024 * 1024
+
+ var buf bytes.Buffer
+ gz := gzip.NewWriter(&buf)
+ if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
+ b.Fatal(err)
+ }
+ gz.Close()
+
+ data := buf.Bytes()
+
+ url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
+ w.Header().Set("Content-Encoding", "gzip")
+ w.Write(data)
+ }))
+
// Do b.N requests to the server.
b.StartTimer()
for i := 0; i < b.N; i++ {
if err != nil {
b.Fatalf("Get: %v", err)
}
- body, err := io.ReadAll(res.Body)
+ n, err := io.Copy(io.Discard, res.Body)
res.Body.Close()
if err != nil {
b.Fatalf("ReadAll: %v", err)
}
- if !bytes.Equal(body, data) {
- b.Fatalf("Got body: %q", body)
+ if n != responseSize {
+ b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n)
}
}
b.StopTimer()
-
- // Instruct server process to stop.
- getNoBody(url + "?stop=yes")
- if err := <-done; err != nil {
- b.Fatalf("subprocess failed: %v", err)
- }
}
func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {
import (
"bufio"
+ "compress/flate"
"compress/gzip"
"container/list"
"context"
}
var errReadOnClosedResBody = errors.New("http: read on closed response body")
+var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
}
// gzipReader wraps a response body so it can lazily
-// call gzip.NewReader on the first call to Read
+// get gzip.Reader from the pool on the first call to Read.
+// After Close is called it puts gzip.Reader to the pool immediately
+// if there is no Read in progress or later when Read completes.
type gzipReader struct {
_ incomparable
body *bodyEOFSignal // underlying HTTP/1 response body framing
- zr *gzip.Reader // lazily-initialized gzip reader
- zerr error // any error from gzip.NewReader; sticky
+ mu sync.Mutex // guards zr and zerr
+ zr *gzip.Reader
+ zerr error
}
-func (gz *gzipReader) Read(p []byte) (n int, err error) {
+type eofReader struct{}
+
+func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
+func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
+
+var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
+
+// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
+func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
+ zr := gzipPool.Get().(*gzip.Reader)
+ if err := zr.Reset(r); err != nil {
+ gzipPoolPut(zr)
+ return nil, err
+ }
+ return zr, nil
+}
+
+// gzipPoolPut puts a gzip.Reader back into the pool.
+func gzipPoolPut(zr *gzip.Reader) {
+ // Reset will allocate bufio.Reader if we pass it anything
+ // other than a flate.Reader, so ensure that it's getting one.
+ var r flate.Reader = eofReader{}
+ zr.Reset(r)
+ gzipPool.Put(zr)
+}
+
+// acquire returns a gzip.Reader for reading response body.
+// The reader must be released after use.
+func (gz *gzipReader) acquire() (*gzip.Reader, error) {
+ gz.mu.Lock()
+ defer gz.mu.Unlock()
+ if gz.zerr != nil {
+ return nil, gz.zerr
+ }
if gz.zr == nil {
- if gz.zerr == nil {
- gz.zr, gz.zerr = gzip.NewReader(gz.body)
- }
+ gz.zr, gz.zerr = gzipPoolGet(gz.body)
if gz.zerr != nil {
- return 0, gz.zerr
+ return nil, gz.zerr
}
}
+ ret := gz.zr
+ gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
+ return ret, nil
+}
+
+// release returns the gzip.Reader to the pool if Close was called during Read.
+func (gz *gzipReader) release(zr *gzip.Reader) {
+ gz.mu.Lock()
+ defer gz.mu.Unlock()
+ if gz.zerr == errConcurrentReadOnResBody {
+ gz.zr, gz.zerr = zr, nil
+ } else { // errReadOnClosedResBody
+ gzipPoolPut(zr)
+ }
+}
- gz.body.mu.Lock()
- if gz.body.closed {
- err = errReadOnClosedResBody
+// close returns the gzip.Reader to the pool immediately or
+// signals release to do so after Read completes.
+func (gz *gzipReader) close() {
+ gz.mu.Lock()
+ defer gz.mu.Unlock()
+ if gz.zerr == nil && gz.zr != nil {
+ gzipPoolPut(gz.zr)
+ gz.zr = nil
}
- gz.body.mu.Unlock()
+ gz.zerr = errReadOnClosedResBody
+}
+func (gz *gzipReader) Read(p []byte) (n int, err error) {
+ zr, err := gz.acquire()
if err != nil {
return 0, err
}
- return gz.zr.Read(p)
+ defer gz.release(zr)
+
+ return zr.Read(p)
}
func (gz *gzipReader) Close() error {
+ gz.close()
+
return gz.body.Close()
}