From: Andy Pan Date: Thu, 19 Nov 2020 11:09:14 +0000 (+0800) Subject: internal/poll: implement a pipe pool for splice() call X-Git-Tag: go1.17beta1~1210 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=643d240a11b2d00e1718b02719707af0708e7519;p=gostls13.git internal/poll: implement a pipe pool for splice() call In scenarios where splice() is called, splice() is usually called not just once, but many times, which means that a lot of pipes will be created and destroyed frequently, costing an amount of system resources and slowing down performance, thus I suggest that we add a pipe pool for reusing pipes. Benchmark tests: goos: linux goarch: amd64 pkg: internal/poll cpu: AMD EPYC 7K62 48-Core Processor name old time/op new time/op delta SplicePipe-8 1.36µs ± 1% 0.02µs ± 0% -98.57% (p=0.001 n=7+7) SplicePipeParallel-8 747ns ± 4% 4ns ± 0% -99.41% (p=0.001 n=7+7) name old alloc/op new alloc/op delta SplicePipe-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7) SplicePipeParallel-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7) name old allocs/op new allocs/op delta SplicePipe-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7) SplicePipeParallel-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7) Fixes #42740 Change-Id: Idff654b7264342084e089b5ba796c87c380c471b Reviewed-on: https://go-review.googlesource.com/c/go/+/271537 Reviewed-by: Ian Lance Taylor Run-TryBot: Ian Lance Taylor TryBot-Result: Go Bot Trust: Brad Fitzpatrick --- diff --git a/src/internal/poll/export_linux_test.go b/src/internal/poll/export_linux_test.go new file mode 100644 index 0000000000..7fba793697 --- /dev/null +++ b/src/internal/poll/export_linux_test.go @@ -0,0 +1,22 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Export guts for testing on linux. +// Since testing imports os and os imports internal/poll, +// the internal/poll tests can not be in package poll. + +package poll + +var ( + GetPipe = getPipe + PutPipe = putPipe + NewPipe = newPipe + DestroyPipe = destroyPipe +) + +func GetPipeFds(p *SplicePipe) (int, int) { + return p.rfd, p.wfd +} + +type SplicePipe = splicePipe diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go index 968bc44a5f..971f754f43 100644 --- a/src/internal/poll/splice_linux.go +++ b/src/internal/poll/splice_linux.go @@ -6,6 +6,8 @@ package poll import ( "internal/syscall/unix" + "runtime" + "sync" "sync/atomic" "syscall" "unsafe" @@ -23,23 +25,23 @@ const ( // Splice transfers at most remain bytes of data from src to dst, using the // splice system call to minimize copies of data from and to userspace. // -// Splice creates a temporary pipe, to serve as a buffer for the data transfer. +// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. // src and dst must both be stream-oriented sockets. // // If err != nil, sc is the system call which caused the error. func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { - prfd, pwfd, sc, err := newTempPipe() + p, sc, err := getPipe() if err != nil { return 0, false, sc, err } - defer destroyTempPipe(prfd, pwfd) + defer putPipe(p) var inPipe, n int for err == nil && remain > 0 { max := maxSpliceSize if int64(max) > remain { max = int(remain) } - inPipe, err = spliceDrain(pwfd, src, max) + inPipe, err = spliceDrain(p.wfd, src, max) // The operation is considered handled if splice returns no // error, or an error other than EINVAL. An EINVAL means the // kernel does not support splice for the socket type of src. @@ -55,10 +57,13 @@ func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, if err != nil || inPipe == 0 { break } - n, err = splicePump(dst, prfd, inPipe) + p.data += inPipe + + n, err = splicePump(dst, p.rfd, inPipe) if n > 0 { written += int64(n) remain -= int64(n) + p.data -= n } } if err != nil { @@ -149,13 +154,57 @@ func splice(out int, in int, max int, flags int) (int, error) { return int(n), err } +type splicePipe struct { + rfd int + wfd int + data int +} + +// splicePipePool caches pipes to avoid high frequency construction and destruction of pipe buffers. +// The garbage collector will free all pipes in the sync.Pool in periodically, thus we need to set up +// a finalizer for each pipe to close the its file descriptors before the actual GC. +var splicePipePool = sync.Pool{New: newPoolPipe} + +func newPoolPipe() interface{} { + // Discard the error which occurred during the creation of pipe buffer, + // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. + p := newPipe() + if p != nil { + runtime.SetFinalizer(p, destroyPipe) + } + return p +} + +// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from cache. +// +// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error +// and system call name splice in string as the indication. +func getPipe() (*splicePipe, string, error) { + v := splicePipePool.Get() + if v == nil { + return nil, "splice", syscall.EINVAL + } + return v.(*splicePipe), "", nil +} + +func putPipe(p *splicePipe) { + // If there is still data left in the pipe, + // then close and discard it instead of putting it back into the pool. + if p.data != 0 { + runtime.SetFinalizer(p, nil) + destroyPipe(p) + return + } + splicePipePool.Put(p) +} + var disableSplice unsafe.Pointer -// newTempPipe sets up a temporary pipe for a splice operation. -func newTempPipe() (prfd, pwfd int, sc string, err error) { +// newPipe sets up a pipe for a splice operation. +func newPipe() (sp *splicePipe) { p := (*bool)(atomic.LoadPointer(&disableSplice)) if p != nil && *p { - return -1, -1, "splice", syscall.EINVAL + return nil } var fds [2]int @@ -165,9 +214,11 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) { // closed. const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK if err := syscall.Pipe2(fds[:], flags); err != nil { - return -1, -1, "pipe2", err + return nil } + sp = &splicePipe{rfd: fds[0], wfd: fds[1]} + if p == nil { p = new(bool) defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p)) @@ -175,20 +226,16 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) { // F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug. if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 { *p = true - destroyTempPipe(fds[0], fds[1]) - return -1, -1, "fcntl", errno + destroyPipe(sp) + return nil } } - return fds[0], fds[1], "", nil + return } -// destroyTempPipe destroys a temporary pipe. -func destroyTempPipe(prfd, pwfd int) error { - err := CloseFunc(prfd) - err1 := CloseFunc(pwfd) - if err == nil { - return err1 - } - return err +// destroyPipe destroys a pipe. +func destroyPipe(p *splicePipe) { + CloseFunc(p.rfd) + CloseFunc(p.wfd) } diff --git a/src/internal/poll/splice_linux_test.go b/src/internal/poll/splice_linux_test.go new file mode 100644 index 0000000000..9ea5197242 --- /dev/null +++ b/src/internal/poll/splice_linux_test.go @@ -0,0 +1,96 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "internal/poll" + "runtime" + "syscall" + "testing" + "time" +) + +// checkPipes returns true if all pipes are closed properly, false otherwise. +func checkPipes(fds []int) bool { + for _, fd := range fds { + // Check if each pipe fd has been closed. + err := syscall.FcntlFlock(uintptr(fd), syscall.F_GETFD, nil) + if err == nil { + return false + } + } + return true +} + +func TestSplicePipePool(t *testing.T) { + const N = 64 + var ( + p *poll.SplicePipe + ps []*poll.SplicePipe + fds []int + err error + ) + for i := 0; i < N; i++ { + p, _, err = poll.GetPipe() + if err != nil { + t.Skip("failed to create pipe, skip this test") + } + prfd, pwfd := poll.GetPipeFds(p) + fds = append(fds, prfd, pwfd) + ps = append(ps, p) + } + for _, p = range ps { + poll.PutPipe(p) + } + ps = nil + + var ok bool + // Trigger garbage collection to free the pipes in sync.Pool and check whether or not + // those pipe buffers have been closed as we expected. + for i := 0; i < 5; i++ { + runtime.GC() + time.Sleep(time.Duration(i*100+10) * time.Millisecond) + if ok = checkPipes(fds); ok { + break + } + } + + if !ok { + t.Fatal("at least one pipe is still open") + } +} + +func BenchmarkSplicePipe(b *testing.B) { + b.Run("SplicePipeWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p, _, _ := poll.GetPipe() + poll.PutPipe(p) + } + }) + b.Run("SplicePipeWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p := poll.NewPipe() + poll.DestroyPipe(p) + } + }) +} + +func BenchmarkSplicePipePoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p, _, _ := poll.GetPipe() + poll.PutPipe(p) + } + }) +} + +func BenchmarkSplicePipeNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p := poll.NewPipe() + poll.DestroyPipe(p) + } + }) +}