]> Cypherpunks repositories - gostls13.git/commitdiff
internal/poll: implement a pipe pool for splice() call
authorAndy Pan <panjf2000@gmail.com>
Thu, 19 Nov 2020 11:09:14 +0000 (19:09 +0800)
committerIan Lance Taylor <iant@golang.org>
Wed, 10 Mar 2021 03:52:48 +0000 (03:52 +0000)
In scenarios where splice() is called, splice() is usually called not just once, but many times,
which means that a lot of pipes will be created and destroyed frequently, costing an amount of system resources
and slowing down performance, thus I suggest that we add a pipe pool for reusing pipes.

Benchmark tests:

goos: linux
goarch: amd64
pkg: internal/poll
cpu: AMD EPYC 7K62 48-Core Processor

name                  old time/op    new time/op    delta
SplicePipe-8            1.36µs ± 1%    0.02µs ± 0%   -98.57%  (p=0.001 n=7+7)
SplicePipeParallel-8     747ns ± 4%       4ns ± 0%   -99.41%  (p=0.001 n=7+7)

name                  old alloc/op   new alloc/op   delta
SplicePipe-8             24.0B ± 0%      0.0B       -100.00%  (p=0.001 n=7+7)
SplicePipeParallel-8     24.0B ± 0%      0.0B       -100.00%  (p=0.001 n=7+7)

name                  old allocs/op  new allocs/op  delta
SplicePipe-8              1.00 ± 0%      0.00       -100.00%  (p=0.001 n=7+7)
SplicePipeParallel-8      1.00 ± 0%      0.00       -100.00%  (p=0.001 n=7+7)

Fixes #42740

Change-Id: Idff654b7264342084e089b5ba796c87c380c471b
Reviewed-on: https://go-review.googlesource.com/c/go/+/271537
Reviewed-by: Ian Lance Taylor <iant@golang.org>
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Trust: Brad Fitzpatrick <bradfitz@golang.org>

src/internal/poll/export_linux_test.go [new file with mode: 0644]
src/internal/poll/splice_linux.go
src/internal/poll/splice_linux_test.go [new file with mode: 0644]

diff --git a/src/internal/poll/export_linux_test.go b/src/internal/poll/export_linux_test.go
new file mode 100644 (file)
index 0000000..7fba793
--- /dev/null
@@ -0,0 +1,22 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Export guts for testing on linux.
+// Since testing imports os and os imports internal/poll,
+// the internal/poll tests can not be in package poll.
+
+package poll
+
+var (
+       GetPipe     = getPipe
+       PutPipe     = putPipe
+       NewPipe     = newPipe
+       DestroyPipe = destroyPipe
+)
+
+func GetPipeFds(p *SplicePipe) (int, int) {
+       return p.rfd, p.wfd
+}
+
+type SplicePipe = splicePipe
index 968bc44a5ffd4d55e419897f73dc6673e6ce3c5f..971f754f43cdf65baabbf5dd09c224f5310ea81c 100644 (file)
@@ -6,6 +6,8 @@ package poll
 
 import (
        "internal/syscall/unix"
+       "runtime"
+       "sync"
        "sync/atomic"
        "syscall"
        "unsafe"
@@ -23,23 +25,23 @@ const (
 // Splice transfers at most remain bytes of data from src to dst, using the
 // splice system call to minimize copies of data from and to userspace.
 //
-// Splice creates a temporary pipe, to serve as a buffer for the data transfer.
+// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
 // src and dst must both be stream-oriented sockets.
 //
 // If err != nil, sc is the system call which caused the error.
 func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
-       prfd, pwfd, sc, err := newTempPipe()
+       p, sc, err := getPipe()
        if err != nil {
                return 0, false, sc, err
        }
-       defer destroyTempPipe(prfd, pwfd)
+       defer putPipe(p)
        var inPipe, n int
        for err == nil && remain > 0 {
                max := maxSpliceSize
                if int64(max) > remain {
                        max = int(remain)
                }
-               inPipe, err = spliceDrain(pwfd, src, max)
+               inPipe, err = spliceDrain(p.wfd, src, max)
                // The operation is considered handled if splice returns no
                // error, or an error other than EINVAL. An EINVAL means the
                // kernel does not support splice for the socket type of src.
@@ -55,10 +57,13 @@ func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string,
                if err != nil || inPipe == 0 {
                        break
                }
-               n, err = splicePump(dst, prfd, inPipe)
+               p.data += inPipe
+
+               n, err = splicePump(dst, p.rfd, inPipe)
                if n > 0 {
                        written += int64(n)
                        remain -= int64(n)
+                       p.data -= n
                }
        }
        if err != nil {
@@ -149,13 +154,57 @@ func splice(out int, in int, max int, flags int) (int, error) {
        return int(n), err
 }
 
+type splicePipe struct {
+       rfd  int
+       wfd  int
+       data int
+}
+
+// splicePipePool caches pipes to avoid high frequency construction and destruction of pipe buffers.
+// The garbage collector will free all pipes in the sync.Pool in periodically, thus we need to set up
+// a finalizer for each pipe to close the its file descriptors before the actual GC.
+var splicePipePool = sync.Pool{New: newPoolPipe}
+
+func newPoolPipe() interface{} {
+       // Discard the error which occurred during the creation of pipe buffer,
+       // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
+       p := newPipe()
+       if p != nil {
+               runtime.SetFinalizer(p, destroyPipe)
+       }
+       return p
+}
+
+// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from cache.
+//
+// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
+// and system call name splice in string as the indication.
+func getPipe() (*splicePipe, string, error) {
+       v := splicePipePool.Get()
+       if v == nil {
+               return nil, "splice", syscall.EINVAL
+       }
+       return v.(*splicePipe), "", nil
+}
+
+func putPipe(p *splicePipe) {
+       // If there is still data left in the pipe,
+       // then close and discard it instead of putting it back into the pool.
+       if p.data != 0 {
+               runtime.SetFinalizer(p, nil)
+               destroyPipe(p)
+               return
+       }
+       splicePipePool.Put(p)
+}
+
 var disableSplice unsafe.Pointer
 
-// newTempPipe sets up a temporary pipe for a splice operation.
-func newTempPipe() (prfd, pwfd int, sc string, err error) {
+// newPipe sets up a pipe for a splice operation.
+func newPipe() (sp *splicePipe) {
        p := (*bool)(atomic.LoadPointer(&disableSplice))
        if p != nil && *p {
-               return -1, -1, "splice", syscall.EINVAL
+               return nil
        }
 
        var fds [2]int
@@ -165,9 +214,11 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
        // closed.
        const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
        if err := syscall.Pipe2(fds[:], flags); err != nil {
-               return -1, -1, "pipe2", err
+               return nil
        }
 
+       sp = &splicePipe{rfd: fds[0], wfd: fds[1]}
+
        if p == nil {
                p = new(bool)
                defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
@@ -175,20 +226,16 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
                // F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
                if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
                        *p = true
-                       destroyTempPipe(fds[0], fds[1])
-                       return -1, -1, "fcntl", errno
+                       destroyPipe(sp)
+                       return nil
                }
        }
 
-       return fds[0], fds[1], "", nil
+       return
 }
 
-// destroyTempPipe destroys a temporary pipe.
-func destroyTempPipe(prfd, pwfd int) error {
-       err := CloseFunc(prfd)
-       err1 := CloseFunc(pwfd)
-       if err == nil {
-               return err1
-       }
-       return err
+// destroyPipe destroys a pipe.
+func destroyPipe(p *splicePipe) {
+       CloseFunc(p.rfd)
+       CloseFunc(p.wfd)
 }
diff --git a/src/internal/poll/splice_linux_test.go b/src/internal/poll/splice_linux_test.go
new file mode 100644 (file)
index 0000000..9ea5197
--- /dev/null
@@ -0,0 +1,96 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll_test
+
+import (
+       "internal/poll"
+       "runtime"
+       "syscall"
+       "testing"
+       "time"
+)
+
+// checkPipes returns true if all pipes are closed properly, false otherwise.
+func checkPipes(fds []int) bool {
+       for _, fd := range fds {
+               // Check if each pipe fd has been closed.
+               err := syscall.FcntlFlock(uintptr(fd), syscall.F_GETFD, nil)
+               if err == nil {
+                       return false
+               }
+       }
+       return true
+}
+
+func TestSplicePipePool(t *testing.T) {
+       const N = 64
+       var (
+               p   *poll.SplicePipe
+               ps  []*poll.SplicePipe
+               fds []int
+               err error
+       )
+       for i := 0; i < N; i++ {
+               p, _, err = poll.GetPipe()
+               if err != nil {
+                       t.Skip("failed to create pipe, skip this test")
+               }
+               prfd, pwfd := poll.GetPipeFds(p)
+               fds = append(fds, prfd, pwfd)
+               ps = append(ps, p)
+       }
+       for _, p = range ps {
+               poll.PutPipe(p)
+       }
+       ps = nil
+
+       var ok bool
+       // Trigger garbage collection to free the pipes in sync.Pool and check whether or not
+       // those pipe buffers have been closed as we expected.
+       for i := 0; i < 5; i++ {
+               runtime.GC()
+               time.Sleep(time.Duration(i*100+10) * time.Millisecond)
+               if ok = checkPipes(fds); ok {
+                       break
+               }
+       }
+
+       if !ok {
+               t.Fatal("at least one pipe is still open")
+       }
+}
+
+func BenchmarkSplicePipe(b *testing.B) {
+       b.Run("SplicePipeWithPool", func(b *testing.B) {
+               for i := 0; i < b.N; i++ {
+                       p, _, _ := poll.GetPipe()
+                       poll.PutPipe(p)
+               }
+       })
+       b.Run("SplicePipeWithoutPool", func(b *testing.B) {
+               for i := 0; i < b.N; i++ {
+                       p := poll.NewPipe()
+                       poll.DestroyPipe(p)
+               }
+       })
+}
+
+func BenchmarkSplicePipePoolParallel(b *testing.B) {
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       p, _, _ := poll.GetPipe()
+                       poll.PutPipe(p)
+               }
+       })
+}
+
+func BenchmarkSplicePipeNativeParallel(b *testing.B) {
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       p := poll.NewPipe()
+                       poll.DestroyPipe(p)
+               }
+       })
+}