From: Dmitriy Vyukov Date: Tue, 12 Mar 2013 20:03:00 +0000 (+0400) Subject: net: refactoring in preparation for integrated network poller X-Git-Tag: go1.1rc2~551 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=b09d88179909a31579d373ff7cccc266604b7ee9;p=gostls13.git net: refactoring in preparation for integrated network poller Introduce pollDesc struct, to split netFD struct into fd-related and poller-related parts. R=golang-dev, bradfitz, iant CC=golang-dev https://golang.org/cl/7762044 --- diff --git a/src/pkg/net/fd_poll_unix.go b/src/pkg/net/fd_poll_unix.go index 93b64bbba4..7f7f764f9c 100644 --- a/src/pkg/net/fd_poll_unix.go +++ b/src/pkg/net/fd_poll_unix.go @@ -29,10 +29,25 @@ type pollServer struct { pr, pw *os.File poll *pollster // low-level OS hooks sync.Mutex // controls pending and deadline - pending map[int]*netFD + pending map[int]*pollDesc deadline int64 // next deadline (nsec since 1970) } +// A pollDesc contains netFD state related to pollServer. +type pollDesc struct { + // immutable after Init() + pollServer *pollServer + sysfd int + cr, cw chan error + + // mutable, protected by pollServer mutex + closing bool + ncr, ncw int + + // mutable, safe for concurrent access + rdeadline, wdeadline deadline +} + func newPollServer() (s *pollServer, err error) { s = new(pollServer) if s.pr, s.pw, err = os.Pipe(); err != nil { @@ -51,7 +66,7 @@ func newPollServer() (s *pollServer, err error) { s.poll.Close() goto Error } - s.pending = make(map[int]*netFD) + s.pending = make(map[int]*pollDesc) go s.Run() return s, nil @@ -67,10 +82,10 @@ Error: return nil, err } -func (s *pollServer) AddFD(fd *netFD, mode int) error { +func (s *pollServer) AddFD(pd *pollDesc, mode int) error { s.Lock() - intfd := fd.sysfd - if intfd < 0 || fd.closing { + intfd := pd.sysfd + if intfd < 0 || pd.closing { // fd closed underfoot s.Unlock() return errClosing @@ -79,14 +94,14 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error { var t int64 key := intfd << 1 if mode == 'r' { - fd.ncr++ - t = fd.rdeadline.value() + pd.ncr++ + t = pd.rdeadline.value() } else { - fd.ncw++ + pd.ncw++ key++ - t = fd.wdeadline.value() + t = pd.wdeadline.value() } - s.pending[key] = fd + s.pending[key] = pd doWakeup := false if t > 0 && (s.deadline == 0 || t < s.deadline) { s.deadline = t @@ -96,7 +111,7 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error { wake, err := s.poll.AddFD(intfd, mode, false) s.Unlock() if err != nil { - return &OpError{"addfd", fd.net, fd.laddr, err} + return err } if wake || doWakeup { s.Wakeup() @@ -104,25 +119,26 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error { return nil } -// Evict evicts fd from the pending list, unblocking -// any I/O running on fd. The caller must have locked +// Evict evicts pd from the pending list, unblocking +// any I/O running on pd. The caller must have locked // pollserver. // Return value is whether the pollServer should be woken up. -func (s *pollServer) Evict(fd *netFD) bool { +func (s *pollServer) Evict(pd *pollDesc) bool { + pd.closing = true doWakeup := false - if s.pending[fd.sysfd<<1] == fd { - s.WakeFD(fd, 'r', errClosing) - if s.poll.DelFD(fd.sysfd, 'r') { + if s.pending[pd.sysfd<<1] == pd { + s.WakeFD(pd, 'r', errClosing) + if s.poll.DelFD(pd.sysfd, 'r') { doWakeup = true } - delete(s.pending, fd.sysfd<<1) + delete(s.pending, pd.sysfd<<1) } - if s.pending[fd.sysfd<<1|1] == fd { - s.WakeFD(fd, 'w', errClosing) - if s.poll.DelFD(fd.sysfd, 'w') { + if s.pending[pd.sysfd<<1|1] == pd { + s.WakeFD(pd, 'w', errClosing) + if s.poll.DelFD(pd.sysfd, 'w') { doWakeup = true } - delete(s.pending, fd.sysfd<<1|1) + delete(s.pending, pd.sysfd<<1|1) } return doWakeup } @@ -131,7 +147,7 @@ var wakeupbuf [1]byte func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) } -func (s *pollServer) LookupFD(fd int, mode int) *netFD { +func (s *pollServer) LookupFD(fd int, mode int) *pollDesc { key := fd << 1 if mode == 'w' { key++ @@ -144,16 +160,16 @@ func (s *pollServer) LookupFD(fd int, mode int) *netFD { return netfd } -func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { +func (s *pollServer) WakeFD(pd *pollDesc, mode int, err error) { if mode == 'r' { - for fd.ncr > 0 { - fd.ncr-- - fd.cr <- err + for pd.ncr > 0 { + pd.ncr-- + pd.cr <- err } } else { - for fd.ncw > 0 { - fd.ncw-- - fd.cw <- err + for pd.ncw > 0 { + pd.ncw-- + pd.cw <- err } } } @@ -164,7 +180,7 @@ func (s *pollServer) CheckDeadlines() { // probably with a heap indexed by wakeup time. var nextDeadline int64 - for key, fd := range s.pending { + for key, pd := range s.pending { var t int64 var mode int if key&1 == 0 { @@ -173,15 +189,15 @@ func (s *pollServer) CheckDeadlines() { mode = 'w' } if mode == 'r' { - t = fd.rdeadline.value() + t = pd.rdeadline.value() } else { - t = fd.wdeadline.value() + t = pd.wdeadline.value() } if t > 0 { if t <= now { delete(s.pending, key) - s.poll.DelFD(fd.sysfd, mode) - s.WakeFD(fd, mode, errTimeout) + s.poll.DelFD(pd.sysfd, mode) + s.WakeFD(pd, mode, errTimeout) } else if nextDeadline == 0 || t < nextDeadline { nextDeadline = t } @@ -220,48 +236,67 @@ func (s *pollServer) Run() { s.pr.Read(scratch[0:]) s.CheckDeadlines() } else { - netfd := s.LookupFD(fd, mode) - if netfd == nil { + pd := s.LookupFD(fd, mode) + if pd == nil { // This can happen because the WaitFD runs without // holding s's lock, so there might be a pending wakeup // for an fd that has been evicted. No harm done. continue } - s.WakeFD(netfd, mode, nil) + s.WakeFD(pd, mode, nil) } } } -func (s *pollServer) PrepareRead(fd *netFD) error { - if fd.rdeadline.expired() { +func (pd *pollDesc) Close() { +} + +func (pd *pollDesc) Lock() { + pd.pollServer.Lock() +} + +func (pd *pollDesc) Unlock() { + pd.pollServer.Unlock() +} + +func (pd *pollDesc) Wakeup() { + pd.pollServer.Wakeup() +} + +func (pd *pollDesc) PrepareRead() error { + if pd.rdeadline.expired() { return errTimeout } return nil } -func (s *pollServer) PrepareWrite(fd *netFD) error { - if fd.wdeadline.expired() { +func (pd *pollDesc) PrepareWrite() error { + if pd.wdeadline.expired() { return errTimeout } return nil } -func (s *pollServer) WaitRead(fd *netFD) error { - err := s.AddFD(fd, 'r') +func (pd *pollDesc) WaitRead() error { + err := pd.pollServer.AddFD(pd, 'r') if err == nil { - err = <-fd.cr + err = <-pd.cr } return err } -func (s *pollServer) WaitWrite(fd *netFD) error { - err := s.AddFD(fd, 'w') +func (pd *pollDesc) WaitWrite() error { + err := pd.pollServer.AddFD(pd, 'w') if err == nil { - err = <-fd.cw + err = <-pd.cw } return err } +func (pd *pollDesc) Evict() bool { + return pd.pollServer.Evict(pd) +} + // Spread network FDs over several pollServers. var pollMaxN int @@ -292,31 +327,29 @@ func startServer(k int) { pollservers[k] = p } -func pollServerInit(fd *netFD) error { +func (pd *pollDesc) Init(fd *netFD) error { pollN := runtime.GOMAXPROCS(0) if pollN > pollMaxN { pollN = pollMaxN } k := fd.sysfd % pollN startServersOnce[k]() - fd.pollServer = pollservers[k] - fd.cr = make(chan error, 1) - fd.cw = make(chan error, 1) + pd.sysfd = fd.sysfd + pd.pollServer = pollservers[k] + pd.cr = make(chan error, 1) + pd.cw = make(chan error, 1) return nil } -func (s *pollServer) Close(fd *netFD) { -} - // TODO(dfc) these unused error returns could be removed func setReadDeadline(fd *netFD, t time.Time) error { - fd.rdeadline.setTime(t) + fd.pd.rdeadline.setTime(t) return nil } func setWriteDeadline(fd *netFD, t time.Time) error { - fd.wdeadline.setTime(t) + fd.pd.wdeadline.setTime(t) return nil } diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go index 51269d8931..5621927dc3 100644 --- a/src/pkg/net/fd_unix.go +++ b/src/pkg/net/fd_unix.go @@ -20,7 +20,7 @@ type netFD struct { sysmu sync.Mutex sysref int - // must lock both sysmu and pollserver to write + // must lock both sysmu and pollDesc to write // can lock either to read closing bool @@ -30,8 +30,6 @@ type netFD struct { sotype int isConnected bool sysfile *os.File - cr chan error - cw chan error net string laddr Addr raddr Addr @@ -39,14 +37,8 @@ type netFD struct { // serialize access to Read and Write methods rio, wio sync.Mutex - // read and write deadlines - rdeadline, wdeadline deadline - - // owned by fd wait server - ncr, ncw int - // wait server - pollServer *pollServer + pd pollDesc } func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) { @@ -65,7 +57,7 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) { sotype: sotype, net: net, } - if err := pollServerInit(netfd); err != nil { + if err := netfd.pd.Init(netfd); err != nil { return nil, err } return netfd, nil @@ -91,12 +83,12 @@ func (fd *netFD) name() string { func (fd *netFD) connect(ra syscall.Sockaddr) error { fd.wio.Lock() defer fd.wio.Unlock() - if err := fd.pollServer.PrepareWrite(fd); err != nil { + if err := fd.pd.PrepareWrite(); err != nil { return err } err := syscall.Connect(fd.sysfd, ra) if err == syscall.EINPROGRESS { - if err = fd.pollServer.WaitWrite(fd); err != nil { + if err = fd.pd.WaitWrite(); err != nil { return err } var e int @@ -112,7 +104,7 @@ func (fd *netFD) connect(ra syscall.Sockaddr) error { } // Add a reference to this fd. -// If closing==true, pollserver must be locked; mark the fd as closing. +// If closing==true, pollDesc must be locked; mark the fd as closing. // Returns an error if the fd cannot be used. func (fd *netFD) incref(closing bool) error { fd.sysmu.Lock() @@ -135,7 +127,7 @@ func (fd *netFD) decref() { fd.sysref-- if fd.closing && fd.sysref == 0 && fd.sysfile != nil { fd.sysfile.Close() - fd.pollServer.Close(fd) + fd.pd.Close() fd.sysfile = nil fd.sysfd = -1 } @@ -143,21 +135,21 @@ func (fd *netFD) decref() { } func (fd *netFD) Close() error { - fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict + fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict if err := fd.incref(true); err != nil { - fd.pollServer.Unlock() + fd.pd.Unlock() return err } // Unblock any I/O. Once it all unblocks and returns, // so that it cannot be referring to fd.sysfd anymore, // the final decref will close fd.sysfd. This should happen // fairly quickly, since all the I/O is non-blocking, and any - // attempts to block in the pollserver will return errClosing. - doWakeup := fd.pollServer.Evict(fd) - fd.pollServer.Unlock() + // attempts to block in the pollDesc will return errClosing. + doWakeup := fd.pd.Evict() + fd.pd.Unlock() fd.decref() if doWakeup { - fd.pollServer.Wakeup() + fd.pd.Wakeup() } return nil } @@ -189,7 +181,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) { return 0, err } defer fd.decref() - if err := fd.pollServer.PrepareRead(fd); err != nil { + if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} } for { @@ -197,7 +189,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) { if err != nil { n = 0 if err == syscall.EAGAIN { - if err = fd.pollServer.WaitRead(fd); err == nil { + if err = fd.pd.WaitRead(); err == nil { continue } } @@ -218,7 +210,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { return 0, nil, err } defer fd.decref() - if err := fd.pollServer.PrepareRead(fd); err != nil { + if err := fd.pd.PrepareRead(); err != nil { return 0, nil, &OpError{"read", fd.net, fd.laddr, err} } for { @@ -226,7 +218,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { if err != nil { n = 0 if err == syscall.EAGAIN { - if err = fd.pollServer.WaitRead(fd); err == nil { + if err = fd.pd.WaitRead(); err == nil { continue } } @@ -247,7 +239,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S return 0, 0, 0, nil, err } defer fd.decref() - if err := fd.pollServer.PrepareRead(fd); err != nil { + if err := fd.pd.PrepareRead(); err != nil { return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err} } for { @@ -255,7 +247,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S if err != nil { // TODO(dfc) should n and oobn be set to 0 if err == syscall.EAGAIN { - if err = fd.pollServer.WaitRead(fd); err == nil { + if err = fd.pd.WaitRead(); err == nil { continue } } @@ -283,7 +275,7 @@ func (fd *netFD) Write(p []byte) (nn int, err error) { return 0, err } defer fd.decref() - if err := fd.pollServer.PrepareWrite(fd); err != nil { + if err := fd.pd.PrepareWrite(); err != nil { return 0, &OpError{"write", fd.net, fd.raddr, err} } for { @@ -296,7 +288,7 @@ func (fd *netFD) Write(p []byte) (nn int, err error) { break } if err == syscall.EAGAIN { - if err = fd.pollServer.WaitWrite(fd); err == nil { + if err = fd.pd.WaitWrite(); err == nil { continue } } @@ -322,13 +314,13 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { return 0, err } defer fd.decref() - if err := fd.pollServer.PrepareWrite(fd); err != nil { + if err := fd.pd.PrepareWrite(); err != nil { return 0, &OpError{"write", fd.net, fd.raddr, err} } for { err = syscall.Sendto(fd.sysfd, p, 0, sa) if err == syscall.EAGAIN { - if err = fd.pollServer.WaitWrite(fd); err == nil { + if err = fd.pd.WaitWrite(); err == nil { continue } } @@ -349,13 +341,13 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob return 0, 0, err } defer fd.decref() - if err := fd.pollServer.PrepareWrite(fd); err != nil { + if err := fd.pd.PrepareWrite(); err != nil { return 0, 0, &OpError{"write", fd.net, fd.raddr, err} } for { err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) if err == syscall.EAGAIN { - if err = fd.pollServer.WaitWrite(fd); err == nil { + if err = fd.pd.WaitWrite(); err == nil { continue } } @@ -380,14 +372,14 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e var s int var rsa syscall.Sockaddr - if err = fd.pollServer.PrepareRead(fd); err != nil { + if err = fd.pd.PrepareRead(); err != nil { return nil, &OpError{"accept", fd.net, fd.laddr, err} } for { s, rsa, err = accept(fd.sysfd) if err != nil { if err == syscall.EAGAIN { - if err = fd.pollServer.WaitRead(fd); err == nil { + if err = fd.pd.WaitRead(); err == nil { continue } } else if err == syscall.ECONNABORTED { diff --git a/src/pkg/net/sendfile_freebsd.go b/src/pkg/net/sendfile_freebsd.go index 8008bc3b56..dc5b767557 100644 --- a/src/pkg/net/sendfile_freebsd.go +++ b/src/pkg/net/sendfile_freebsd.go @@ -83,7 +83,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { break } if err1 == syscall.EAGAIN { - if err1 = c.pollServer.WaitWrite(c); err1 == nil { + if err1 = c.pd.WaitWrite(); err1 == nil { continue } } diff --git a/src/pkg/net/sendfile_linux.go b/src/pkg/net/sendfile_linux.go index 3357e65386..6f1323b3dc 100644 --- a/src/pkg/net/sendfile_linux.go +++ b/src/pkg/net/sendfile_linux.go @@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { break } if err1 == syscall.EAGAIN { - if err1 = c.pollServer.WaitWrite(c); err1 == nil { + if err1 = c.pd.WaitWrite(); err1 == nil { continue } }