From: Sébastien Paolacci Date: Wed, 26 Sep 2012 19:32:59 +0000 (-0400) Subject: net: spread fd over several pollservers. X-Git-Tag: go1.1rc2~2308 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=7014bc64b1be8f85fff75ec13f8597b6a6aed366;p=gostls13.git net: spread fd over several pollservers. Lighten contention without preventing further improvements on pollservers. Connections are spread over Min(GOMAXPROCS, NumCPU, 8) pollserver instances. Median of 10 runs, 4 cores @ 3.4GHz, amd/linux-3.2: BenchmarkTCPOneShot 171917 ns/op 175194 ns/op 1.91% BenchmarkTCPOneShot-2 101413 ns/op 109462 ns/op 7.94% BenchmarkTCPOneShot-4 91796 ns/op 35712 ns/op -61.10% BenchmarkTCPOneShot-6 90938 ns/op 30607 ns/op -66.34% BenchmarkTCPOneShot-8 90374 ns/op 29150 ns/op -67.75% BenchmarkTCPOneShot-16 101089 ns/op 111526 ns/op 10.32% BenchmarkTCPOneShotTimeout 174986 ns/op 178606 ns/op 2.07% BenchmarkTCPOneShotTimeout-2 101585 ns/op 110678 ns/op 8.95% BenchmarkTCPOneShotTimeout-4 91547 ns/op 35931 ns/op -60.75% BenchmarkTCPOneShotTimeout-6 91496 ns/op 31019 ns/op -66.10% BenchmarkTCPOneShotTimeout-8 90670 ns/op 29531 ns/op -67.43% BenchmarkTCPOneShotTimeout-16 101013 ns/op 106026 ns/op 4.96% BenchmarkTCPPersistent 51731 ns/op 53324 ns/op 3.08% BenchmarkTCPPersistent-2 32888 ns/op 30678 ns/op -6.72% BenchmarkTCPPersistent-4 25751 ns/op 15595 ns/op -39.44% BenchmarkTCPPersistent-6 26737 ns/op 9805 ns/op -63.33% BenchmarkTCPPersistent-8 26850 ns/op 9730 ns/op -63.76% BenchmarkTCPPersistent-16 104449 ns/op 102838 ns/op -1.54% BenchmarkTCPPersistentTimeout 51806 ns/op 53281 ns/op 2.85% BenchmarkTCPPersistentTimeout-2 32956 ns/op 30895 ns/op -6.25% BenchmarkTCPPersistentTimeout-4 25994 ns/op 18111 ns/op -30.33% BenchmarkTCPPersistentTimeout-6 26679 ns/op 9846 ns/op -63.09% BenchmarkTCPPersistentTimeout-8 26810 ns/op 9727 ns/op -63.72% BenchmarkTCPPersistentTimeout-16 101652 ns/op 104410 ns/op 2.71% R=rsc, dvyukov, dave, mikioh.mikioh, bradfitz, remyoudompheng CC=golang-dev https://golang.org/cl/6496054 --- diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go index 57e04bb6ca..c55f3362f0 100644 --- a/src/pkg/net/fd_unix.go +++ b/src/pkg/net/fd_unix.go @@ -10,6 +10,7 @@ import ( "errors" "io" "os" + "runtime" "sync" "syscall" "time" @@ -45,6 +46,9 @@ type netFD struct { // owned by fd wait server ncr, ncw int + + // wait server + pollServer *pollServer } // A pollServer helps FDs determine when to retry a non-blocking @@ -255,21 +259,45 @@ func (s *pollServer) WaitWrite(fd *netFD) error { } // Network FD methods. -// All the network FDs use a single pollServer. +// Spread network FDs over several pollServers. + +var pollMaxN int +var pollservers []*pollServer +var startServersOnce []func() -var pollserver *pollServer -var onceStartServer sync.Once +func init() { + pollMaxN = runtime.NumCPU() + if pollMaxN > 8 { + pollMaxN = 8 // No improvement then. + } + pollservers = make([]*pollServer, pollMaxN) + startServersOnce = make([]func(), pollMaxN) + for i := 0; i < pollMaxN; i++ { + k := i + once := new(sync.Once) + startServersOnce[i] = func() { once.Do(func() { startServer(k) }) } + } +} -func startServer() { +func startServer(k int) { p, err := newPollServer() if err != nil { - print("Start pollServer: ", err.Error(), "\n") + panic(err) + } + pollservers[k] = p +} + +func server(fd int) *pollServer { + pollN := runtime.GOMAXPROCS(0) + if pollN > pollMaxN { + pollN = pollMaxN } - pollserver = p + k := fd % pollN + startServersOnce[k]() + return pollservers[k] } func newFD(fd, family, sotype int, net string) (*netFD, error) { - onceStartServer.Do(startServer) if err := syscall.SetNonblock(fd, true); err != nil { return nil, err } @@ -281,6 +309,7 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) { } netfd.cr = make(chan error, 1) netfd.cw = make(chan error, 1) + netfd.pollServer = server(fd) return netfd, nil } @@ -300,7 +329,7 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { func (fd *netFD) connect(ra syscall.Sockaddr) error { err := syscall.Connect(fd.sysfd, ra) if err == syscall.EINPROGRESS { - if err = pollserver.WaitWrite(fd); err != nil { + if err = fd.pollServer.WaitWrite(fd); err != nil { return err } var e int @@ -354,8 +383,8 @@ func (fd *netFD) decref() { } func (fd *netFD) Close() error { - pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict - defer pollserver.Unlock() + fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict + defer fd.pollServer.Unlock() if err := fd.incref(true); err != nil { return err } @@ -364,7 +393,7 @@ func (fd *netFD) Close() error { // the final decref will close fd.sysfd. This should happen // fairly quickly, since all the I/O is non-blocking, and any // attempts to block in the pollserver will return errClosing. - pollserver.Evict(fd) + fd.pollServer.Evict(fd) fd.decref() return nil } @@ -401,7 +430,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) { if err == syscall.EAGAIN { err = errTimeout if fd.rdeadline >= 0 { - if err = pollserver.WaitRead(fd); err == nil { + if err = fd.pollServer.WaitRead(fd); err == nil { continue } } @@ -431,7 +460,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { if err == syscall.EAGAIN { err = errTimeout if fd.rdeadline >= 0 { - if err = pollserver.WaitRead(fd); err == nil { + if err = fd.pollServer.WaitRead(fd); err == nil { continue } } @@ -459,7 +488,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S if err == syscall.EAGAIN { err = errTimeout if fd.rdeadline >= 0 { - if err = pollserver.WaitRead(fd); err == nil { + if err = fd.pollServer.WaitRead(fd); err == nil { continue } } @@ -501,7 +530,7 @@ func (fd *netFD) Write(p []byte) (int, error) { if err == syscall.EAGAIN { err = errTimeout if fd.wdeadline >= 0 { - if err = pollserver.WaitWrite(fd); err == nil { + if err = fd.pollServer.WaitWrite(fd); err == nil { continue } } @@ -533,7 +562,7 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { if err == syscall.EAGAIN { err = errTimeout if fd.wdeadline >= 0 { - if err = pollserver.WaitWrite(fd); err == nil { + if err = fd.pollServer.WaitWrite(fd); err == nil { continue } } @@ -560,7 +589,7 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob if err == syscall.EAGAIN { err = errTimeout if fd.wdeadline >= 0 { - if err = pollserver.WaitWrite(fd); err == nil { + if err = fd.pollServer.WaitWrite(fd); err == nil { continue } } @@ -595,7 +624,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e if err == syscall.EAGAIN { err = errTimeout if fd.rdeadline >= 0 { - if err = pollserver.WaitRead(fd); err == nil { + if err = fd.pollServer.WaitRead(fd); err == nil { continue } } diff --git a/src/pkg/net/sendfile_freebsd.go b/src/pkg/net/sendfile_freebsd.go index 40afdee96d..8500006104 100644 --- a/src/pkg/net/sendfile_freebsd.go +++ b/src/pkg/net/sendfile_freebsd.go @@ -83,7 +83,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { break } if err1 == syscall.EAGAIN && c.wdeadline >= 0 { - if err1 = pollserver.WaitWrite(c); err1 == nil { + if err1 = c.pollServer.WaitWrite(c); err1 == nil { continue } } diff --git a/src/pkg/net/sendfile_linux.go b/src/pkg/net/sendfile_linux.go index a0d5303626..5ee18f9ccc 100644 --- a/src/pkg/net/sendfile_linux.go +++ b/src/pkg/net/sendfile_linux.go @@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { break } if err1 == syscall.EAGAIN && c.wdeadline >= 0 { - if err1 = pollserver.WaitWrite(c); err1 == nil { + if err1 = c.pollServer.WaitWrite(c); err1 == nil { continue } }