From b000f2286c3c9d462cf390084e1ea4ae16649fe9 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Thu, 7 Mar 2013 21:44:24 +0400 Subject: [PATCH] net: more refactoring in preparation for runtime integrated pollster Move pollServer from fd_unix.go to fd_poll_unix.go. Add pollServerInit(*NetFD) to allow custom initialization. Add pollServer.Close(*NetFD) to allow custom finalization. Move setDeadline() to fd_poll_unix.go to allow custom handling of deadlines. Move newPollServer() to fd_poll_unix.go to allow custom initialization. No logical code changes. The next step will be to turn off fd_poll_unix.go for some platform (I have changes for darwin/linux) and redirect it into runtime. See: https://golang.org/cl/7569043/diff/2001/src/pkg/net/fd_poll_runtime.go R=golang-dev, bradfitz CC=golang-dev https://golang.org/cl/7513045 --- src/pkg/net/fd_poll_unix.go | 327 ++++++++++++++++++++++++++++++ src/pkg/net/fd_unix.go | 263 +----------------------- src/pkg/net/newpollserver_unix.go | 46 ----- src/pkg/net/sock_posix.go | 8 +- src/pkg/net/sockopt_posix.go | 19 -- src/pkg/net/sockopt_windows.go | 19 ++ 6 files changed, 356 insertions(+), 326 deletions(-) create mode 100644 src/pkg/net/fd_poll_unix.go delete mode 100644 src/pkg/net/newpollserver_unix.go diff --git a/src/pkg/net/fd_poll_unix.go b/src/pkg/net/fd_poll_unix.go new file mode 100644 index 0000000000..93b64bbba4 --- /dev/null +++ b/src/pkg/net/fd_poll_unix.go @@ -0,0 +1,327 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin freebsd linux netbsd openbsd + +package net + +import ( + "os" + "runtime" + "sync" + "syscall" + "time" +) + +// A pollServer helps FDs determine when to retry a non-blocking +// read or write after they get EAGAIN. When an FD needs to wait, +// call s.WaitRead() or s.WaitWrite() to pass the request to the poll server. +// When the pollServer finds that i/o on FD should be possible +// again, it will send on fd.cr/fd.cw to wake any waiting goroutines. +// +// To avoid races in closing, all fd operations are locked and +// refcounted. when netFD.Close() is called, it calls syscall.Shutdown +// and sets a closing flag. Only when the last reference is removed +// will the fd be closed. + +type pollServer struct { + pr, pw *os.File + poll *pollster // low-level OS hooks + sync.Mutex // controls pending and deadline + pending map[int]*netFD + deadline int64 // next deadline (nsec since 1970) +} + +func newPollServer() (s *pollServer, err error) { + s = new(pollServer) + if s.pr, s.pw, err = os.Pipe(); err != nil { + return nil, err + } + if err = syscall.SetNonblock(int(s.pr.Fd()), true); err != nil { + goto Errno + } + if err = syscall.SetNonblock(int(s.pw.Fd()), true); err != nil { + goto Errno + } + if s.poll, err = newpollster(); err != nil { + goto Error + } + if _, err = s.poll.AddFD(int(s.pr.Fd()), 'r', true); err != nil { + s.poll.Close() + goto Error + } + s.pending = make(map[int]*netFD) + go s.Run() + return s, nil + +Errno: + err = &os.PathError{ + Op: "setnonblock", + Path: s.pr.Name(), + Err: err, + } +Error: + s.pr.Close() + s.pw.Close() + return nil, err +} + +func (s *pollServer) AddFD(fd *netFD, mode int) error { + s.Lock() + intfd := fd.sysfd + if intfd < 0 || fd.closing { + // fd closed underfoot + s.Unlock() + return errClosing + } + + var t int64 + key := intfd << 1 + if mode == 'r' { + fd.ncr++ + t = fd.rdeadline.value() + } else { + fd.ncw++ + key++ + t = fd.wdeadline.value() + } + s.pending[key] = fd + doWakeup := false + if t > 0 && (s.deadline == 0 || t < s.deadline) { + s.deadline = t + doWakeup = true + } + + wake, err := s.poll.AddFD(intfd, mode, false) + s.Unlock() + if err != nil { + return &OpError{"addfd", fd.net, fd.laddr, err} + } + if wake || doWakeup { + s.Wakeup() + } + return nil +} + +// Evict evicts fd from the pending list, unblocking +// any I/O running on fd. The caller must have locked +// pollserver. +// Return value is whether the pollServer should be woken up. +func (s *pollServer) Evict(fd *netFD) bool { + doWakeup := false + if s.pending[fd.sysfd<<1] == fd { + s.WakeFD(fd, 'r', errClosing) + if s.poll.DelFD(fd.sysfd, 'r') { + doWakeup = true + } + delete(s.pending, fd.sysfd<<1) + } + if s.pending[fd.sysfd<<1|1] == fd { + s.WakeFD(fd, 'w', errClosing) + if s.poll.DelFD(fd.sysfd, 'w') { + doWakeup = true + } + delete(s.pending, fd.sysfd<<1|1) + } + return doWakeup +} + +var wakeupbuf [1]byte + +func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) } + +func (s *pollServer) LookupFD(fd int, mode int) *netFD { + key := fd << 1 + if mode == 'w' { + key++ + } + netfd, ok := s.pending[key] + if !ok { + return nil + } + delete(s.pending, key) + return netfd +} + +func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { + if mode == 'r' { + for fd.ncr > 0 { + fd.ncr-- + fd.cr <- err + } + } else { + for fd.ncw > 0 { + fd.ncw-- + fd.cw <- err + } + } +} + +func (s *pollServer) CheckDeadlines() { + now := time.Now().UnixNano() + // TODO(rsc): This will need to be handled more efficiently, + // probably with a heap indexed by wakeup time. + + var nextDeadline int64 + for key, fd := range s.pending { + var t int64 + var mode int + if key&1 == 0 { + mode = 'r' + } else { + mode = 'w' + } + if mode == 'r' { + t = fd.rdeadline.value() + } else { + t = fd.wdeadline.value() + } + if t > 0 { + if t <= now { + delete(s.pending, key) + s.poll.DelFD(fd.sysfd, mode) + s.WakeFD(fd, mode, errTimeout) + } else if nextDeadline == 0 || t < nextDeadline { + nextDeadline = t + } + } + } + s.deadline = nextDeadline +} + +func (s *pollServer) Run() { + var scratch [100]byte + s.Lock() + defer s.Unlock() + for { + var timeout int64 // nsec to wait for or 0 for none + if s.deadline > 0 { + timeout = s.deadline - time.Now().UnixNano() + if timeout <= 0 { + s.CheckDeadlines() + continue + } + } + fd, mode, err := s.poll.WaitFD(s, timeout) + if err != nil { + print("pollServer WaitFD: ", err.Error(), "\n") + return + } + if fd < 0 { + // Timeout happened. + s.CheckDeadlines() + continue + } + if fd == int(s.pr.Fd()) { + // Drain our wakeup pipe (we could loop here, + // but it's unlikely that there are more than + // len(scratch) wakeup calls). + s.pr.Read(scratch[0:]) + s.CheckDeadlines() + } else { + netfd := s.LookupFD(fd, mode) + if netfd == nil { + // This can happen because the WaitFD runs without + // holding s's lock, so there might be a pending wakeup + // for an fd that has been evicted. No harm done. + continue + } + s.WakeFD(netfd, mode, nil) + } + } +} + +func (s *pollServer) PrepareRead(fd *netFD) error { + if fd.rdeadline.expired() { + return errTimeout + } + return nil +} + +func (s *pollServer) PrepareWrite(fd *netFD) error { + if fd.wdeadline.expired() { + return errTimeout + } + return nil +} + +func (s *pollServer) WaitRead(fd *netFD) error { + err := s.AddFD(fd, 'r') + if err == nil { + err = <-fd.cr + } + return err +} + +func (s *pollServer) WaitWrite(fd *netFD) error { + err := s.AddFD(fd, 'w') + if err == nil { + err = <-fd.cw + } + return err +} + +// Spread network FDs over several pollServers. + +var pollMaxN int +var pollservers []*pollServer +var startServersOnce []func() + +var canCancelIO = true // used for testing current package + +func sysInit() { + pollMaxN = runtime.NumCPU() + if pollMaxN > 8 { + pollMaxN = 8 // No improvement then. + } + pollservers = make([]*pollServer, pollMaxN) + startServersOnce = make([]func(), pollMaxN) + for i := 0; i < pollMaxN; i++ { + k := i + once := new(sync.Once) + startServersOnce[i] = func() { once.Do(func() { startServer(k) }) } + } +} + +func startServer(k int) { + p, err := newPollServer() + if err != nil { + panic(err) + } + pollservers[k] = p +} + +func pollServerInit(fd *netFD) error { + pollN := runtime.GOMAXPROCS(0) + if pollN > pollMaxN { + pollN = pollMaxN + } + k := fd.sysfd % pollN + startServersOnce[k]() + fd.pollServer = pollservers[k] + fd.cr = make(chan error, 1) + fd.cw = make(chan error, 1) + return nil +} + +func (s *pollServer) Close(fd *netFD) { +} + +// TODO(dfc) these unused error returns could be removed + +func setReadDeadline(fd *netFD, t time.Time) error { + fd.rdeadline.setTime(t) + return nil +} + +func setWriteDeadline(fd *netFD, t time.Time) error { + fd.wdeadline.setTime(t) + return nil +} + +func setDeadline(fd *netFD, t time.Time) error { + setReadDeadline(fd, t) + setWriteDeadline(fd, t) + return nil +} diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go index d7a83b6393..51269d8931 100644 --- a/src/pkg/net/fd_unix.go +++ b/src/pkg/net/fd_unix.go @@ -9,7 +9,6 @@ package net import ( "io" "os" - "runtime" "sync" "syscall" "time" @@ -50,261 +49,6 @@ type netFD struct { pollServer *pollServer } -// A pollServer helps FDs determine when to retry a non-blocking -// read or write after they get EAGAIN. When an FD needs to wait, -// call s.WaitRead() or s.WaitWrite() to pass the request to the poll server. -// When the pollServer finds that i/o on FD should be possible -// again, it will send on fd.cr/fd.cw to wake any waiting goroutines. -// -// To avoid races in closing, all fd operations are locked and -// refcounted. when netFD.Close() is called, it calls syscall.Shutdown -// and sets a closing flag. Only when the last reference is removed -// will the fd be closed. - -type pollServer struct { - pr, pw *os.File - poll *pollster // low-level OS hooks - sync.Mutex // controls pending and deadline - pending map[int]*netFD - deadline int64 // next deadline (nsec since 1970) -} - -func (s *pollServer) AddFD(fd *netFD, mode int) error { - s.Lock() - intfd := fd.sysfd - if intfd < 0 || fd.closing { - // fd closed underfoot - s.Unlock() - return errClosing - } - - var t int64 - key := intfd << 1 - if mode == 'r' { - fd.ncr++ - t = fd.rdeadline.value() - } else { - fd.ncw++ - key++ - t = fd.wdeadline.value() - } - s.pending[key] = fd - doWakeup := false - if t > 0 && (s.deadline == 0 || t < s.deadline) { - s.deadline = t - doWakeup = true - } - - wake, err := s.poll.AddFD(intfd, mode, false) - s.Unlock() - if err != nil { - return &OpError{"addfd", fd.net, fd.laddr, err} - } - if wake || doWakeup { - s.Wakeup() - } - return nil -} - -// Evict evicts fd from the pending list, unblocking -// any I/O running on fd. The caller must have locked -// pollserver. -// Return value is whether the pollServer should be woken up. -func (s *pollServer) Evict(fd *netFD) bool { - doWakeup := false - if s.pending[fd.sysfd<<1] == fd { - s.WakeFD(fd, 'r', errClosing) - if s.poll.DelFD(fd.sysfd, 'r') { - doWakeup = true - } - delete(s.pending, fd.sysfd<<1) - } - if s.pending[fd.sysfd<<1|1] == fd { - s.WakeFD(fd, 'w', errClosing) - if s.poll.DelFD(fd.sysfd, 'w') { - doWakeup = true - } - delete(s.pending, fd.sysfd<<1|1) - } - return doWakeup -} - -var wakeupbuf [1]byte - -func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) } - -func (s *pollServer) LookupFD(fd int, mode int) *netFD { - key := fd << 1 - if mode == 'w' { - key++ - } - netfd, ok := s.pending[key] - if !ok { - return nil - } - delete(s.pending, key) - return netfd -} - -func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { - if mode == 'r' { - for fd.ncr > 0 { - fd.ncr-- - fd.cr <- err - } - } else { - for fd.ncw > 0 { - fd.ncw-- - fd.cw <- err - } - } -} - -func (s *pollServer) CheckDeadlines() { - now := time.Now().UnixNano() - // TODO(rsc): This will need to be handled more efficiently, - // probably with a heap indexed by wakeup time. - - var nextDeadline int64 - for key, fd := range s.pending { - var t int64 - var mode int - if key&1 == 0 { - mode = 'r' - } else { - mode = 'w' - } - if mode == 'r' { - t = fd.rdeadline.value() - } else { - t = fd.wdeadline.value() - } - if t > 0 { - if t <= now { - delete(s.pending, key) - s.poll.DelFD(fd.sysfd, mode) - s.WakeFD(fd, mode, errTimeout) - } else if nextDeadline == 0 || t < nextDeadline { - nextDeadline = t - } - } - } - s.deadline = nextDeadline -} - -func (s *pollServer) Run() { - var scratch [100]byte - s.Lock() - defer s.Unlock() - for { - var timeout int64 // nsec to wait for or 0 for none - if s.deadline > 0 { - timeout = s.deadline - time.Now().UnixNano() - if timeout <= 0 { - s.CheckDeadlines() - continue - } - } - fd, mode, err := s.poll.WaitFD(s, timeout) - if err != nil { - print("pollServer WaitFD: ", err.Error(), "\n") - return - } - if fd < 0 { - // Timeout happened. - s.CheckDeadlines() - continue - } - if fd == int(s.pr.Fd()) { - // Drain our wakeup pipe (we could loop here, - // but it's unlikely that there are more than - // len(scratch) wakeup calls). - s.pr.Read(scratch[0:]) - s.CheckDeadlines() - } else { - netfd := s.LookupFD(fd, mode) - if netfd == nil { - // This can happen because the WaitFD runs without - // holding s's lock, so there might be a pending wakeup - // for an fd that has been evicted. No harm done. - continue - } - s.WakeFD(netfd, mode, nil) - } - } -} - -func (s *pollServer) PrepareRead(fd *netFD) error { - if fd.rdeadline.expired() { - return errTimeout - } - return nil -} - -func (s *pollServer) PrepareWrite(fd *netFD) error { - if fd.wdeadline.expired() { - return errTimeout - } - return nil -} - -func (s *pollServer) WaitRead(fd *netFD) error { - err := s.AddFD(fd, 'r') - if err == nil { - err = <-fd.cr - } - return err -} - -func (s *pollServer) WaitWrite(fd *netFD) error { - err := s.AddFD(fd, 'w') - if err == nil { - err = <-fd.cw - } - return err -} - -// Network FD methods. -// Spread network FDs over several pollServers. - -var pollMaxN int -var pollservers []*pollServer -var startServersOnce []func() - -var canCancelIO = true // used for testing current package - -func sysInit() { - pollMaxN = runtime.NumCPU() - if pollMaxN > 8 { - pollMaxN = 8 // No improvement then. - } - pollservers = make([]*pollServer, pollMaxN) - startServersOnce = make([]func(), pollMaxN) - for i := 0; i < pollMaxN; i++ { - k := i - once := new(sync.Once) - startServersOnce[i] = func() { once.Do(func() { startServer(k) }) } - } -} - -func startServer(k int) { - p, err := newPollServer() - if err != nil { - panic(err) - } - pollservers[k] = p -} - -func server(fd int) *pollServer { - pollN := runtime.GOMAXPROCS(0) - if pollN > pollMaxN { - pollN = pollMaxN - } - k := fd % pollN - startServersOnce[k]() - return pollservers[k] -} - func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) { deadline := time.Now().Add(timeout) ra, err := resolveAddr("dial", net, addr, deadline) @@ -321,9 +65,9 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) { sotype: sotype, net: net, } - netfd.cr = make(chan error, 1) - netfd.cw = make(chan error, 1) - netfd.pollServer = server(fd) + if err := pollServerInit(netfd); err != nil { + return nil, err + } return netfd, nil } @@ -391,6 +135,7 @@ func (fd *netFD) decref() { fd.sysref-- if fd.closing && fd.sysref == 0 && fd.sysfile != nil { fd.sysfile.Close() + fd.pollServer.Close(fd) fd.sysfile = nil fd.sysfd = -1 } diff --git a/src/pkg/net/newpollserver_unix.go b/src/pkg/net/newpollserver_unix.go deleted file mode 100644 index 618b5b10ba..0000000000 --- a/src/pkg/net/newpollserver_unix.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build darwin freebsd linux netbsd openbsd - -package net - -import ( - "os" - "syscall" -) - -func newPollServer() (s *pollServer, err error) { - s = new(pollServer) - if s.pr, s.pw, err = os.Pipe(); err != nil { - return nil, err - } - if err = syscall.SetNonblock(int(s.pr.Fd()), true); err != nil { - goto Errno - } - if err = syscall.SetNonblock(int(s.pw.Fd()), true); err != nil { - goto Errno - } - if s.poll, err = newpollster(); err != nil { - goto Error - } - if _, err = s.poll.AddFD(int(s.pr.Fd()), 'r', true); err != nil { - s.poll.Close() - goto Error - } - s.pending = make(map[int]*netFD) - go s.Run() - return s, nil - -Errno: - err = &os.PathError{ - Op: "setnonblock", - Path: s.pr.Name(), - Err: err, - } -Error: - s.pr.Close() - s.pw.Close() - return nil, err -} diff --git a/src/pkg/net/sock_posix.go b/src/pkg/net/sock_posix.go index b50a892b14..74b671c895 100644 --- a/src/pkg/net/sock_posix.go +++ b/src/pkg/net/sock_posix.go @@ -49,13 +49,17 @@ func socket(net string, f, t, p int, ipv6only bool, ulsa, ursa syscall.Sockaddr, } if ursa != nil { - fd.wdeadline.setTime(deadline) + if !deadline.IsZero() { + setWriteDeadline(fd, deadline) + } if err = fd.connect(ursa); err != nil { closesocket(s) return nil, err } fd.isConnected = true - fd.wdeadline.set(0) + if !deadline.IsZero() { + setWriteDeadline(fd, time.Time{}) + } } lsa, _ := syscall.Getsockname(s) diff --git a/src/pkg/net/sockopt_posix.go b/src/pkg/net/sockopt_posix.go index fe371fe0ce..1590f4e98d 100644 --- a/src/pkg/net/sockopt_posix.go +++ b/src/pkg/net/sockopt_posix.go @@ -11,7 +11,6 @@ package net import ( "os" "syscall" - "time" ) // Boolean to int. @@ -119,24 +118,6 @@ func setWriteBuffer(fd *netFD, bytes int) error { return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes)) } -// TODO(dfc) these unused error returns could be removed - -func setReadDeadline(fd *netFD, t time.Time) error { - fd.rdeadline.setTime(t) - return nil -} - -func setWriteDeadline(fd *netFD, t time.Time) error { - fd.wdeadline.setTime(t) - return nil -} - -func setDeadline(fd *netFD, t time.Time) error { - setReadDeadline(fd, t) - setWriteDeadline(fd, t) - return nil -} - func setKeepAlive(fd *netFD, keepalive bool) error { if err := fd.incref(false); err != nil { return err diff --git a/src/pkg/net/sockopt_windows.go b/src/pkg/net/sockopt_windows.go index 509b5963bf..0861fe8f4b 100644 --- a/src/pkg/net/sockopt_windows.go +++ b/src/pkg/net/sockopt_windows.go @@ -9,6 +9,7 @@ package net import ( "os" "syscall" + "time" ) func setDefaultSockopts(s syscall.Handle, f, t int, ipv6only bool) error { @@ -47,3 +48,21 @@ func setDefaultMulticastSockopts(s syscall.Handle) error { } return nil } + +// TODO(dfc) these unused error returns could be removed + +func setReadDeadline(fd *netFD, t time.Time) error { + fd.rdeadline.setTime(t) + return nil +} + +func setWriteDeadline(fd *netFD, t time.Time) error { + fd.wdeadline.setTime(t) + return nil +} + +func setDeadline(fd *netFD, t time.Time) error { + setReadDeadline(fd, t) + setWriteDeadline(fd, t) + return nil +} -- 2.48.1