From: Ben Burkert Date: Mon, 21 May 2018 19:56:02 +0000 (-0700) Subject: net: use splice(2) on Linux when reading from UnixConn, rework splice tests X-Git-Tag: go1.12beta1~1165 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=fc5edaca30801f2d1acb7a9d39943638e6d4c1c1;p=gostls13.git net: use splice(2) on Linux when reading from UnixConn, rework splice tests Rework the splice tests and benchmarks. Move the reading and writing of the spliced connections to child processes so that the I/O is not part of benchmarks or profiles. Enable the use of splice(2) when reading from a unix connection and writing to a TCP connection. The updated benchmarks show a performance gain when using splice(2) to copy large chunks of data that the original benchmark did not capture. name old time/op new time/op delta Splice/tcp-to-tcp/1024-8 5.01µs ± 2% 5.08µs ± 3% ~ (p=0.068 n=8+10) Splice/tcp-to-tcp/2048-8 4.76µs ± 5% 4.65µs ± 3% -2.36% (p=0.015 n=9+8) Splice/tcp-to-tcp/4096-8 4.91µs ± 2% 4.98µs ± 5% ~ (p=0.315 n=9+10) Splice/tcp-to-tcp/8192-8 5.50µs ± 4% 5.44µs ± 3% ~ (p=0.758 n=7+9) Splice/tcp-to-tcp/16384-8 7.65µs ± 7% 6.53µs ± 3% -14.65% (p=0.000 n=10+9) Splice/tcp-to-tcp/32768-8 15.3µs ± 7% 8.5µs ± 5% -44.21% (p=0.000 n=10+10) Splice/tcp-to-tcp/65536-8 30.0µs ± 6% 15.7µs ± 1% -47.58% (p=0.000 n=10+8) Splice/tcp-to-tcp/131072-8 59.2µs ± 2% 27.4µs ± 5% -53.75% (p=0.000 n=9+9) Splice/tcp-to-tcp/262144-8 121µs ± 4% 54µs ±19% -55.56% (p=0.000 n=9+10) Splice/tcp-to-tcp/524288-8 247µs ± 6% 108µs ±12% -56.34% (p=0.000 n=10+10) Splice/tcp-to-tcp/1048576-8 490µs ± 4% 199µs ±12% -59.31% (p=0.000 n=8+10) Splice/unix-to-tcp/1024-8 1.20µs ± 2% 1.35µs ± 7% +12.47% (p=0.000 n=10+10) Splice/unix-to-tcp/2048-8 1.33µs ±12% 1.57µs ± 4% +17.85% (p=0.000 n=9+10) Splice/unix-to-tcp/4096-8 2.24µs ± 4% 1.67µs ± 4% -25.14% (p=0.000 n=9+10) Splice/unix-to-tcp/8192-8 4.59µs ± 8% 2.20µs ±10% -52.01% (p=0.000 n=10+10) Splice/unix-to-tcp/16384-8 8.46µs ±13% 3.48µs ± 6% -58.91% (p=0.000 n=10+10) Splice/unix-to-tcp/32768-8 18.5µs ± 9% 6.1µs ± 9% -66.99% (p=0.000 n=10+10) Splice/unix-to-tcp/65536-8 35.9µs ± 7% 13.5µs ± 6% -62.40% (p=0.000 n=10+9) Splice/unix-to-tcp/131072-8 79.4µs ± 6% 25.7µs ± 4% -67.62% (p=0.000 n=10+9) Splice/unix-to-tcp/262144-8 157µs ± 4% 54µs ± 8% -65.63% (p=0.000 n=10+10) Splice/unix-to-tcp/524288-8 311µs ± 3% 107µs ± 8% -65.74% (p=0.000 n=10+10) Splice/unix-to-tcp/1048576-8 643µs ± 4% 185µs ±32% -71.21% (p=0.000 n=10+10) name old speed new speed delta Splice/tcp-to-tcp/1024-8 204MB/s ± 2% 202MB/s ± 3% ~ (p=0.068 n=8+10) Splice/tcp-to-tcp/2048-8 430MB/s ± 5% 441MB/s ± 3% +2.39% (p=0.014 n=9+8) Splice/tcp-to-tcp/4096-8 833MB/s ± 2% 823MB/s ± 5% ~ (p=0.315 n=9+10) Splice/tcp-to-tcp/8192-8 1.49GB/s ± 4% 1.51GB/s ± 3% ~ (p=0.758 n=7+9) Splice/tcp-to-tcp/16384-8 2.14GB/s ± 7% 2.51GB/s ± 3% +17.03% (p=0.000 n=10+9) Splice/tcp-to-tcp/32768-8 2.15GB/s ± 7% 3.85GB/s ± 5% +79.11% (p=0.000 n=10+10) Splice/tcp-to-tcp/65536-8 2.19GB/s ± 5% 4.17GB/s ± 1% +90.65% (p=0.000 n=10+8) Splice/tcp-to-tcp/131072-8 2.22GB/s ± 2% 4.79GB/s ± 4% +116.26% (p=0.000 n=9+9) Splice/tcp-to-tcp/262144-8 2.17GB/s ± 4% 4.93GB/s ±17% +127.25% (p=0.000 n=9+10) Splice/tcp-to-tcp/524288-8 2.13GB/s ± 6% 4.89GB/s ±13% +130.15% (p=0.000 n=10+10) Splice/tcp-to-tcp/1048576-8 2.09GB/s ±10% 5.29GB/s ±11% +153.36% (p=0.000 n=10+10) Splice/unix-to-tcp/1024-8 850MB/s ± 2% 757MB/s ± 7% -10.94% (p=0.000 n=10+10) Splice/unix-to-tcp/2048-8 1.54GB/s ±11% 1.31GB/s ± 3% -15.32% (p=0.000 n=9+10) Splice/unix-to-tcp/4096-8 1.83GB/s ± 4% 2.45GB/s ± 4% +33.59% (p=0.000 n=9+10) Splice/unix-to-tcp/8192-8 1.79GB/s ± 9% 3.73GB/s ± 9% +108.05% (p=0.000 n=10+10) Splice/unix-to-tcp/16384-8 1.95GB/s ±13% 4.68GB/s ± 3% +139.80% (p=0.000 n=10+9) Splice/unix-to-tcp/32768-8 1.78GB/s ± 9% 5.38GB/s ±10% +202.71% (p=0.000 n=10+10) Splice/unix-to-tcp/65536-8 1.83GB/s ± 8% 4.85GB/s ± 6% +165.70% (p=0.000 n=10+9) Splice/unix-to-tcp/131072-8 1.65GB/s ± 6% 5.10GB/s ± 4% +208.77% (p=0.000 n=10+9) Splice/unix-to-tcp/262144-8 1.67GB/s ± 4% 4.87GB/s ± 7% +191.19% (p=0.000 n=10+10) Splice/unix-to-tcp/524288-8 1.69GB/s ± 3% 4.93GB/s ± 7% +192.38% (p=0.000 n=10+10) Splice/unix-to-tcp/1048576-8 1.63GB/s ± 3% 5.60GB/s ±44% +243.26% (p=0.000 n=10+9) Change-Id: I1eae4c3459c918558c70fc42283db22ff7e0442c Reviewed-on: https://go-review.googlesource.com/113997 Reviewed-by: Brad Fitzpatrick --- diff --git a/src/net/splice_linux.go b/src/net/splice_linux.go index b055f93351..8a4d55af62 100644 --- a/src/net/splice_linux.go +++ b/src/net/splice_linux.go @@ -11,7 +11,7 @@ import ( // splice transfers data from r to c using the splice system call to minimize // copies from and to userspace. c must be a TCP connection. Currently, splice -// is only enabled if r is also a TCP connection. +// is only enabled if r is a TCP or Unix connection. // // If splice returns handled == false, it has performed no work. func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { @@ -23,11 +23,17 @@ func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, true } } - s, ok := r.(*TCPConn) - if !ok { + + var s *netFD + if tc, ok := r.(*TCPConn); ok { + s = tc.fd + } else if uc, ok := r.(*UnixConn); ok { + s = uc.fd + } else { return 0, nil, false } - written, handled, sc, err := poll.Splice(&c.pfd, &s.fd.pfd, remain) + + written, handled, sc, err := poll.Splice(&c.pfd, &s.pfd, remain) if lr != nil { lr.N -= written } diff --git a/src/net/splice_test.go b/src/net/splice_test.go index ffe71ae384..3e7fd8251b 100644 --- a/src/net/splice_test.go +++ b/src/net/splice_test.go @@ -7,239 +7,103 @@ package net import ( - "bytes" - "fmt" "io" "io/ioutil" + "log" + "os" + "os/exec" + "strconv" "sync" "testing" ) func TestSplice(t *testing.T) { - t.Run("simple", testSpliceSimple) - t.Run("multipleWrite", testSpliceMultipleWrite) - t.Run("big", testSpliceBig) - t.Run("honorsLimitedReader", testSpliceHonorsLimitedReader) - t.Run("readerAtEOF", testSpliceReaderAtEOF) - t.Run("issue25985", testSpliceIssue25985) + t.Run("tcp-to-tcp", func(t *testing.T) { testSplice(t, "tcp", "tcp") }) + t.Run("unix-to-tcp", func(t *testing.T) { testSplice(t, "unix", "tcp") }) } -func testSpliceSimple(t *testing.T) { - srv, err := newSpliceTestServer() - if err != nil { - t.Fatal(err) - } - defer srv.Close() - copyDone := srv.Copy() - msg := []byte("splice test") - if _, err := srv.Write(msg); err != nil { - t.Fatal(err) - } - got := make([]byte, len(msg)) - if _, err := io.ReadFull(srv, got); err != nil { - t.Fatal(err) - } - if !bytes.Equal(got, msg) { - t.Errorf("got %q, wrote %q", got, msg) - } - srv.CloseWrite() - srv.CloseRead() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } +func testSplice(t *testing.T, upNet, downNet string) { + t.Run("simple", spliceTestCase{upNet, downNet, 128, 128, 0}.test) + t.Run("multipleWrite", spliceTestCase{upNet, downNet, 4096, 1 << 20, 0}.test) + t.Run("big", spliceTestCase{upNet, downNet, 5 << 20, 1 << 30, 0}.test) + t.Run("honorsLimitedReader", spliceTestCase{upNet, downNet, 4096, 1 << 20, 1 << 10}.test) + t.Run("updatesLimitedReaderN", spliceTestCase{upNet, downNet, 1024, 4096, 4096 + 100}.test) + t.Run("limitedReaderAtLimit", spliceTestCase{upNet, downNet, 32, 128, 128}.test) + t.Run("readerAtEOF", func(t *testing.T) { testSpliceReaderAtEOF(t, upNet, downNet) }) + t.Run("issue25985", func(t *testing.T) { testSpliceIssue25985(t, upNet, downNet) }) } -func testSpliceMultipleWrite(t *testing.T) { - srv, err := newSpliceTestServer() - if err != nil { - t.Fatal(err) - } - defer srv.Close() - copyDone := srv.Copy() - msg1 := []byte("splice test part 1 ") - msg2 := []byte(" splice test part 2") - if _, err := srv.Write(msg1); err != nil { - t.Fatalf("Write: %v", err) - } - if _, err := srv.Write(msg2); err != nil { - t.Fatal(err) - } - got := make([]byte, len(msg1)+len(msg2)) - if _, err := io.ReadFull(srv, got); err != nil { - t.Fatal(err) - } - want := append(msg1, msg2...) - if !bytes.Equal(got, want) { - t.Errorf("got %q, wrote %q", got, want) - } - srv.CloseWrite() - srv.CloseRead() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } -} +type spliceTestCase struct { + upNet, downNet string -func testSpliceBig(t *testing.T) { - // The maximum amount of data that internal/poll.Splice will use in a - // splice(2) call is 4 << 20. Use a bigger size here so that we test an - // amount that doesn't fit in a single call. - size := 5 << 20 - srv, err := newSpliceTestServer() - if err != nil { - t.Fatal(err) - } - defer srv.Close() - big := make([]byte, size) - copyDone := srv.Copy() - type readResult struct { - b []byte - err error - } - readDone := make(chan readResult) - go func() { - got := make([]byte, len(big)) - _, err := io.ReadFull(srv, got) - readDone <- readResult{got, err} - }() - if _, err := srv.Write(big); err != nil { - t.Fatal(err) - } - res := <-readDone - if res.err != nil { - t.Fatal(res.err) - } - got := res.b - if !bytes.Equal(got, big) { - t.Errorf("input and output differ") - } - srv.CloseWrite() - srv.CloseRead() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } -} - -func testSpliceHonorsLimitedReader(t *testing.T) { - t.Run("stopsAfterN", testSpliceStopsAfterN) - t.Run("updatesN", testSpliceUpdatesN) - t.Run("readerAtLimit", testSpliceReaderAtLimit) + chunkSize, totalSize int + limitReadSize int } -func testSpliceStopsAfterN(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") +func (tc spliceTestCase) test(t *testing.T) { + clientUp, serverUp, err := spliceTestSocketPair(tc.upNet) if err != nil { t.Fatal(err) } - defer clientUp.Close() defer serverUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") + cleanup, err := startSpliceClient(clientUp, "w", tc.chunkSize, tc.totalSize) if err != nil { t.Fatal(err) } - defer clientDown.Close() - defer serverDown.Close() - count := 128 - copyDone := make(chan error) - lr := &io.LimitedReader{ - N: int64(count), - R: serverUp, - } - go func() { - _, err := io.Copy(serverDown, lr) - serverDown.Close() - copyDone <- err - }() - msg := make([]byte, 2*count) - if _, err := clientUp.Write(msg); err != nil { - t.Fatal(err) - } - clientUp.Close() - var buf bytes.Buffer - if _, err := io.Copy(&buf, clientDown); err != nil { - t.Fatal(err) - } - if buf.Len() != count { - t.Errorf("splice transferred %d bytes, want to stop after %d", buf.Len(), count) - } - clientDown.Close() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } -} - -func testSpliceUpdatesN(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") + defer cleanup() + clientDown, serverDown, err := spliceTestSocketPair(tc.downNet) if err != nil { t.Fatal(err) } - defer clientUp.Close() - defer serverUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) - } - defer clientDown.Close() defer serverDown.Close() - count := 128 - copyDone := make(chan error) - lr := &io.LimitedReader{ - N: int64(100 + count), - R: serverUp, - } - go func() { - _, err := io.Copy(serverDown, lr) - copyDone <- err - }() - msg := make([]byte, count) - if _, err := clientUp.Write(msg); err != nil { - t.Fatal(err) - } - clientUp.Close() - got := make([]byte, count) - if _, err := io.ReadFull(clientDown, got); err != nil { + cleanup, err = startSpliceClient(clientDown, "r", tc.chunkSize, tc.totalSize) + if err != nil { t.Fatal(err) } - clientDown.Close() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } - wantN := int64(100) - if lr.N != wantN { - t.Errorf("lr.N = %d, want %d", lr.N, wantN) - } -} + defer cleanup() + var ( + r io.Reader = serverUp + size = tc.totalSize + ) + if tc.limitReadSize > 0 { + if tc.limitReadSize < size { + size = tc.limitReadSize + } -func testSpliceReaderAtLimit(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) + r = &io.LimitedReader{ + N: int64(tc.limitReadSize), + R: serverUp, + } + defer serverUp.Close() } - defer clientUp.Close() - defer serverUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") + n, err := io.Copy(serverDown, r) + serverDown.Close() if err != nil { t.Fatal(err) } - defer clientDown.Close() - defer serverDown.Close() - - lr := &io.LimitedReader{ - N: 0, - R: serverUp, + if want := int64(size); want != n { + t.Errorf("want %d bytes spliced, got %d", want, n) } - _, err, handled := splice(serverDown.(*TCPConn).fd, lr) - if !handled { - t.Errorf("exhausted LimitedReader: got err = %v, handled = %t, want handled = true", err, handled) + + if tc.limitReadSize > 0 { + wantN := 0 + if tc.limitReadSize > size { + wantN = tc.limitReadSize - size + } + + if n := r.(*io.LimitedReader).N; n != int64(wantN) { + t.Errorf("r.N = %d, want %d", n, wantN) + } } } -func testSpliceReaderAtEOF(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") +func testSpliceReaderAtEOF(t *testing.T, upNet, downNet string) { + clientUp, serverUp, err := spliceTestSocketPair(upNet) if err != nil { t.Fatal(err) } defer clientUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") + clientDown, serverDown, err := spliceTestSocketPair(downNet) if err != nil { t.Fatal(err) } @@ -265,7 +129,7 @@ func testSpliceReaderAtEOF(t *testing.T) { // get a goodbye signal. Test for the goodbye signal. msg := "bye" go func() { - serverDown.(*TCPConn).ReadFrom(serverUp) + serverDown.(io.ReaderFrom).ReadFrom(serverUp) io.WriteString(serverDown, msg) serverDown.Close() }() @@ -280,13 +144,13 @@ func testSpliceReaderAtEOF(t *testing.T) { } } -func testSpliceIssue25985(t *testing.T) { - front, err := newLocalListener("tcp") +func testSpliceIssue25985(t *testing.T, upNet, downNet string) { + front, err := newLocalListener(upNet) if err != nil { t.Fatal(err) } defer front.Close() - back, err := newLocalListener("tcp") + back, err := newLocalListener(downNet) if err != nil { t.Fatal(err) } @@ -300,7 +164,7 @@ func testSpliceIssue25985(t *testing.T) { if err != nil { return } - dst, err := Dial("tcp", back.Addr().String()) + dst, err := Dial(downNet, back.Addr().String()) if err != nil { return } @@ -318,7 +182,7 @@ func testSpliceIssue25985(t *testing.T) { go proxy() - toFront, err := Dial("tcp", front.Addr().String()) + toFront, err := Dial(upNet, front.Addr().String()) if err != nil { t.Fatal(err) } @@ -340,166 +204,71 @@ func testSpliceIssue25985(t *testing.T) { wg.Wait() } -func BenchmarkTCPReadFrom(b *testing.B) { +func BenchmarkSplice(b *testing.B) { testHookUninstaller.Do(uninstallTestHooks) - var chunkSizes []int - for i := uint(10); i <= 20; i++ { - chunkSizes = append(chunkSizes, 1< totalSize { + buf = buf[:totalSize-count] + } + + var err error + if n, err = fn(buf); err != nil { + return + } + } +}