From f2316c27892b82ee414b0e43708edca2e7995468 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Andrei=20Tudor=20C=C4=83lin?= Date: Wed, 18 Apr 2018 10:56:06 +0200 Subject: [PATCH] net: add support for splice(2) in (*TCPConn).ReadFrom on Linux This change adds support for the splice system call on Linux, for the purpose of optimizing (*TCPConn).ReadFrom by reducing copies of data from and to userspace. It does so by creating a temporary pipe and splicing data from the source connection to the pipe, then from the pipe to the destination connection. The pipe serves as an in-kernel buffer for the data transfer. No new API is added to package net, but a new Splice function is added to package internal/poll, because using splice requires help from the network poller. Users of the net package should benefit from the change transparently. This change only enables the optimization if the Reader in ReadFrom is a TCP connection. Since splice is a more general interface, it could, in theory, also be enabled if the Reader were a unix socket, or the read half of a pipe. However, benchmarks show that enabling it for unix sockets is most likely not a net performance gain. The tcp <- unix case is also fairly unlikely to be used very much by users of package net. Enabling the optimization for pipes is also problematic from an implementation perspective, since package net cannot easily get at the *poll.FD of an *os.File. A possible solution to this would be to dup the pipe file descriptor, register the duped descriptor with the network poller, and work on that *poll.FD instead of the original. However, this seems too intrusive, so it has not been done. If there was a clean way to do it, it would probably be worth doing, since splicing from a pipe to a socket can be done directly. Therefore, this patch only enables the optimization for what is likely the most common use case: tcp <- tcp. The following benchmark compares the performance of the previous userspace genericReadFrom code path to the new optimized code path. The sub-benchmarks represent chunk sizes used by the writer on the other end of the Reader passed to ReadFrom. benchmark old ns/op new ns/op delta BenchmarkTCPReadFrom/1024-4 4727 4954 +4.80% BenchmarkTCPReadFrom/2048-4 4389 4301 -2.01% BenchmarkTCPReadFrom/4096-4 4606 4534 -1.56% BenchmarkTCPReadFrom/8192-4 5219 4779 -8.43% BenchmarkTCPReadFrom/16384-4 8708 8008 -8.04% BenchmarkTCPReadFrom/32768-4 16349 14973 -8.42% BenchmarkTCPReadFrom/65536-4 35246 27406 -22.24% BenchmarkTCPReadFrom/131072-4 72920 52382 -28.17% BenchmarkTCPReadFrom/262144-4 149311 95094 -36.31% BenchmarkTCPReadFrom/524288-4 306704 181856 -40.71% BenchmarkTCPReadFrom/1048576-4 674174 357406 -46.99% benchmark old MB/s new MB/s speedup BenchmarkTCPReadFrom/1024-4 216.62 206.69 0.95x BenchmarkTCPReadFrom/2048-4 466.61 476.08 1.02x BenchmarkTCPReadFrom/4096-4 889.09 903.31 1.02x BenchmarkTCPReadFrom/8192-4 1569.40 1714.06 1.09x BenchmarkTCPReadFrom/16384-4 1881.42 2045.84 1.09x BenchmarkTCPReadFrom/32768-4 2004.18 2188.41 1.09x BenchmarkTCPReadFrom/65536-4 1859.38 2391.25 1.29x BenchmarkTCPReadFrom/131072-4 1797.46 2502.21 1.39x BenchmarkTCPReadFrom/262144-4 1755.69 2756.68 1.57x BenchmarkTCPReadFrom/524288-4 1709.42 2882.98 1.69x BenchmarkTCPReadFrom/1048576-4 1555.35 2933.84 1.89x Fixes #10948 Change-Id: I3ce27f21f7adda8b696afdc48a91149998ae16a5 Reviewed-on: https://go-review.googlesource.com/107715 Run-TryBot: Brad Fitzpatrick Run-TryBot: Ian Lance Taylor TryBot-Result: Gobot Gobot Reviewed-by: Ian Lance Taylor --- src/internal/poll/splice_linux.go | 184 +++++++++++++ src/net/splice_linux.go | 35 +++ src/net/splice_stub.go | 13 + src/net/splice_test.go | 426 ++++++++++++++++++++++++++++++ src/net/tcpsock_posix.go | 3 + 5 files changed, 661 insertions(+) create mode 100644 src/internal/poll/splice_linux.go create mode 100644 src/net/splice_linux.go create mode 100644 src/net/splice_stub.go create mode 100644 src/net/splice_test.go diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go new file mode 100644 index 0000000000..7ebd548a97 --- /dev/null +++ b/src/internal/poll/splice_linux.go @@ -0,0 +1,184 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +const ( + // spliceNonblock makes calls to splice(2) non-blocking. + spliceNonblock = 0x2 + + // maxSpliceSize is the maximum amount of data Splice asks + // the kernel to move in a single call to splice(2). + maxSpliceSize = 4 << 20 +) + +// Splice transfers at most remain bytes of data from src to dst, using the +// splice system call to minimize copies of data from and to userspace. +// +// Splice creates a temporary pipe, to serve as a buffer for the data transfer. +// src and dst must both be stream-oriented sockets. +// +// If err != nil, sc is the system call which caused the error. +func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { + prfd, pwfd, sc, err := newTempPipe() + if err != nil { + return 0, false, sc, err + } + defer destroyTempPipe(prfd, pwfd) + // From here on, the operation should be considered handled, + // even if Splice doesn't transfer any data. + if err := src.readLock(); err != nil { + return 0, true, "splice", err + } + defer src.readUnlock() + if err := dst.writeLock(); err != nil { + return 0, true, "splice", err + } + defer dst.writeUnlock() + if err := src.pd.prepareRead(src.isFile); err != nil { + return 0, true, "splice", err + } + if err := dst.pd.prepareWrite(dst.isFile); err != nil { + return 0, true, "splice", err + } + var inPipe, n int + for err == nil && remain > 0 { + max := maxSpliceSize + if int64(max) > remain { + max = int(remain) + } + inPipe, err = spliceDrain(pwfd, src, max) + // spliceDrain should never return EAGAIN, so if err != nil, + // Splice cannot continue. If inPipe == 0 && err == nil, + // src is at EOF, and the transfer is complete. + if err != nil || (inPipe == 0 && err == nil) { + break + } + n, err = splicePump(dst, prfd, inPipe) + if n > 0 { + written += int64(n) + remain -= int64(n) + } + } + if err != nil { + return written, true, "splice", err + } + return written, true, "", nil +} + +// spliceDrain moves data from a socket to a pipe. +// +// Invariant: when entering spliceDrain, the pipe is empty. It is either in its +// initial state, or splicePump has emptied it previously. +// +// Given this, spliceDrain can reasonably assume that the pipe is ready for +// writing, so if splice returns EAGAIN, it must be because the socket is not +// ready for reading. +// +// If spliceDrain returns (0, nil), src is at EOF. +func spliceDrain(pipefd int, sock *FD, max int) (int, error) { + for { + n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) + if err != syscall.EAGAIN { + return n, err + } + if err := sock.pd.waitRead(sock.isFile); err != nil { + return n, err + } + } +} + +// splicePump moves all the buffered data from a pipe to a socket. +// +// Invariant: when entering splicePump, there are exactly inPipe +// bytes of data in the pipe, from a previous call to spliceDrain. +// +// By analogy to the condition from spliceDrain, splicePump +// only needs to poll the socket for readiness, if splice returns +// EAGAIN. +// +// If splicePump cannot move all the data in a single call to +// splice(2), it loops over the buffered data until it has written +// all of it to the socket. This behavior is similar to the Write +// step of an io.Copy in userspace. +func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { + written := 0 + for inPipe > 0 { + n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) + // Here, the condition n == 0 && err == nil should never be + // observed, since Splice controls the write side of the pipe. + if n > 0 { + inPipe -= n + written += n + continue + } + if err != syscall.EAGAIN { + return written, err + } + if err := sock.pd.waitWrite(sock.isFile); err != nil { + return written, err + } + } + return written, nil +} + +// splice wraps the splice system call. Since the current implementation +// only uses splice on sockets and pipes, the offset arguments are unused. +// splice returns int instead of int64, because callers never ask it to +// move more data in a single call than can fit in an int32. +func splice(out int, in int, max int, flags int) (int, error) { + n, err := syscall.Splice(in, nil, out, nil, max, flags) + return int(n), err +} + +// newTempPipe sets up a temporary pipe for a splice operation. +func newTempPipe() (prfd, pwfd int, sc string, err error) { + var fds [2]int + const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK + if err := syscall.Pipe2(fds[:], flags); err != nil { + // pipe2 was added in 2.6.27 and our minimum requirement + // is 2.6.23, so it might not be implemented. + if err == syscall.ENOSYS { + return newTempPipeFallback(fds[:]) + } + return -1, -1, "pipe2", err + } + return fds[0], fds[1], "", nil +} + +// newTempPipeFallback is a fallback for newTempPipe, for systems +// which do not support pipe2. +func newTempPipeFallback(fds []int) (prfd, pwfd int, sc string, err error) { + syscall.ForkLock.RLock() + defer syscall.ForkLock.RUnlock() + if err := syscall.Pipe(fds); err != nil { + return -1, -1, "pipe", err + } + prfd, pwfd = fds[0], fds[1] + syscall.CloseOnExec(prfd) + syscall.CloseOnExec(pwfd) + if err := syscall.SetNonblock(prfd, true); err != nil { + CloseFunc(prfd) + CloseFunc(pwfd) + return -1, -1, "setnonblock", err + } + if err := syscall.SetNonblock(pwfd, true); err != nil { + CloseFunc(prfd) + CloseFunc(pwfd) + return -1, -1, "setnonblock", err + } + return prfd, pwfd, "", nil +} + +// destroyTempPipe destroys a temporary pipe. +func destroyTempPipe(prfd, pwfd int) error { + err := CloseFunc(prfd) + err1 := CloseFunc(pwfd) + if err == nil { + return err1 + } + return err +} diff --git a/src/net/splice_linux.go b/src/net/splice_linux.go new file mode 100644 index 0000000000..b055f93351 --- /dev/null +++ b/src/net/splice_linux.go @@ -0,0 +1,35 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import ( + "internal/poll" + "io" +) + +// splice transfers data from r to c using the splice system call to minimize +// copies from and to userspace. c must be a TCP connection. Currently, splice +// is only enabled if r is also a TCP connection. +// +// If splice returns handled == false, it has performed no work. +func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { + var remain int64 = 1 << 62 // by default, copy until EOF + lr, ok := r.(*io.LimitedReader) + if ok { + remain, r = lr.N, lr.R + if remain <= 0 { + return 0, nil, true + } + } + s, ok := r.(*TCPConn) + if !ok { + return 0, nil, false + } + written, handled, sc, err := poll.Splice(&c.pfd, &s.fd.pfd, remain) + if lr != nil { + lr.N -= written + } + return written, wrapSyscallError(sc, err), handled +} diff --git a/src/net/splice_stub.go b/src/net/splice_stub.go new file mode 100644 index 0000000000..9106cb2c18 --- /dev/null +++ b/src/net/splice_stub.go @@ -0,0 +1,13 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !linux + +package net + +import "io" + +func splice(c *netFD, r io.Reader) (int64, error, bool) { + return 0, nil, false +} diff --git a/src/net/splice_test.go b/src/net/splice_test.go new file mode 100644 index 0000000000..483a9e555f --- /dev/null +++ b/src/net/splice_test.go @@ -0,0 +1,426 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package net + +import ( + "bytes" + "fmt" + "io" + "testing" +) + +func TestSplice(t *testing.T) { + t.Run("simple", testSpliceSimple) + t.Run("multipleWrite", testSpliceMultipleWrite) + t.Run("big", testSpliceBig) + t.Run("honorsLimitedReader", testSpliceHonorsLimitedReader) + t.Run("readerAtEOF", testSpliceReaderAtEOF) +} + +func testSpliceSimple(t *testing.T) { + srv, err := newSpliceTestServer() + if err != nil { + t.Fatal(err) + } + defer srv.Close() + copyDone := srv.Copy() + msg := []byte("splice test") + if _, err := srv.Write(msg); err != nil { + t.Fatal(err) + } + got := make([]byte, len(msg)) + if _, err := io.ReadFull(srv, got); err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, msg) { + t.Errorf("got %q, wrote %q", got, msg) + } + srv.CloseWrite() + srv.CloseRead() + if err := <-copyDone; err != nil { + t.Errorf("splice: %v", err) + } +} + +func testSpliceMultipleWrite(t *testing.T) { + srv, err := newSpliceTestServer() + if err != nil { + t.Fatal(err) + } + defer srv.Close() + copyDone := srv.Copy() + msg1 := []byte("splice test part 1 ") + msg2 := []byte(" splice test part 2") + if _, err := srv.Write(msg1); err != nil { + t.Fatalf("Write: %v", err) + } + if _, err := srv.Write(msg2); err != nil { + t.Fatal(err) + } + got := make([]byte, len(msg1)+len(msg2)) + if _, err := io.ReadFull(srv, got); err != nil { + t.Fatal(err) + } + want := append(msg1, msg2...) + if !bytes.Equal(got, want) { + t.Errorf("got %q, wrote %q", got, want) + } + srv.CloseWrite() + srv.CloseRead() + if err := <-copyDone; err != nil { + t.Errorf("splice: %v", err) + } +} + +func testSpliceBig(t *testing.T) { + size := 1<<31 - 1 + if testing.Short() { + size = 1 << 25 + } + srv, err := newSpliceTestServer() + if err != nil { + t.Fatal(err) + } + defer srv.Close() + big := make([]byte, size) + copyDone := srv.Copy() + type readResult struct { + b []byte + err error + } + readDone := make(chan readResult) + go func() { + got := make([]byte, len(big)) + _, err := io.ReadFull(srv, got) + readDone <- readResult{got, err} + }() + if _, err := srv.Write(big); err != nil { + t.Fatal(err) + } + res := <-readDone + if res.err != nil { + t.Fatal(res.err) + } + got := res.b + if !bytes.Equal(got, big) { + t.Errorf("input and output differ") + } + srv.CloseWrite() + srv.CloseRead() + if err := <-copyDone; err != nil { + t.Errorf("splice: %v", err) + } +} + +func testSpliceHonorsLimitedReader(t *testing.T) { + t.Run("stopsAfterN", testSpliceStopsAfterN) + t.Run("updatesN", testSpliceUpdatesN) +} + +func testSpliceStopsAfterN(t *testing.T) { + clientUp, serverUp, err := spliceTestSocketPair("tcp") + if err != nil { + t.Fatal(err) + } + defer clientUp.Close() + defer serverUp.Close() + clientDown, serverDown, err := spliceTestSocketPair("tcp") + if err != nil { + t.Fatal(err) + } + defer clientDown.Close() + defer serverDown.Close() + count := 128 + copyDone := make(chan error) + lr := &io.LimitedReader{ + N: int64(count), + R: serverUp, + } + go func() { + _, err := io.Copy(serverDown, lr) + serverDown.Close() + copyDone <- err + }() + msg := make([]byte, 2*count) + if _, err := clientUp.Write(msg); err != nil { + t.Fatal(err) + } + clientUp.Close() + var buf bytes.Buffer + if _, err := io.Copy(&buf, clientDown); err != nil { + t.Fatal(err) + } + if buf.Len() != count { + t.Errorf("splice transferred %d bytes, want to stop after %d", buf.Len(), count) + } + clientDown.Close() + if err := <-copyDone; err != nil { + t.Errorf("splice: %v", err) + } +} + +func testSpliceUpdatesN(t *testing.T) { + clientUp, serverUp, err := spliceTestSocketPair("tcp") + if err != nil { + t.Fatal(err) + } + defer clientUp.Close() + defer serverUp.Close() + clientDown, serverDown, err := spliceTestSocketPair("tcp") + if err != nil { + t.Fatal(err) + } + defer clientDown.Close() + defer serverDown.Close() + count := 128 + copyDone := make(chan error) + lr := &io.LimitedReader{ + N: int64(100 + count), + R: serverUp, + } + go func() { + _, err := io.Copy(serverDown, lr) + copyDone <- err + }() + msg := make([]byte, count) + if _, err := clientUp.Write(msg); err != nil { + t.Fatal(err) + } + clientUp.Close() + got := make([]byte, count) + if _, err := io.ReadFull(clientDown, got); err != nil { + t.Fatal(err) + } + clientDown.Close() + if err := <-copyDone; err != nil { + t.Errorf("splice: %v", err) + } + wantN := int64(100) + if lr.N != wantN { + t.Errorf("lr.N = %d, want %d", lr.N, wantN) + } +} + +func testSpliceReaderAtEOF(t *testing.T) { + clientUp, serverUp, err := spliceTestSocketPair("tcp") + if err != nil { + t.Fatal(err) + } + defer clientUp.Close() + defer serverUp.Close() + clientDown, serverDown, err := spliceTestSocketPair("tcp") + if err != nil { + t.Fatal(err) + } + defer clientDown.Close() + defer serverDown.Close() + + serverUp.Close() + _, err, handled := splice(serverDown.(*TCPConn).fd, serverUp) + if !handled { + t.Errorf("closed connection: got err = %v, handled = %t, want handled = true", err, handled) + } + lr := &io.LimitedReader{ + N: 0, + R: serverUp, + } + _, err, handled = splice(serverDown.(*TCPConn).fd, lr) + if !handled { + t.Errorf("exhausted LimitedReader: got err = %v, handled = %t, want handled = true", err, handled) + } +} + +func BenchmarkTCPReadFrom(b *testing.B) { + testHookUninstaller.Do(uninstallTestHooks) + + var chunkSizes []int + for i := uint(10); i <= 20; i++ { + chunkSizes = append(chunkSizes, 1<