]> Cypherpunks repositories - gostls13.git/commitdiff
net: refactoring in preparation for integrated network poller
authorDmitriy Vyukov <dvyukov@google.com>
Tue, 12 Mar 2013 20:03:00 +0000 (00:03 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Tue, 12 Mar 2013 20:03:00 +0000 (00:03 +0400)
Introduce pollDesc struct, to split netFD struct into fd-related
and poller-related parts.

R=golang-dev, bradfitz, iant
CC=golang-dev
https://golang.org/cl/7762044

src/pkg/net/fd_poll_unix.go
src/pkg/net/fd_unix.go
src/pkg/net/sendfile_freebsd.go
src/pkg/net/sendfile_linux.go

index 93b64bbba43db8b11e7e2f0018312c3f490254e8..7f7f764f9c72861cca8a6a8b8c93c46aa14df6ed 100644 (file)
@@ -29,10 +29,25 @@ type pollServer struct {
        pr, pw     *os.File
        poll       *pollster // low-level OS hooks
        sync.Mutex           // controls pending and deadline
-       pending    map[int]*netFD
+       pending    map[int]*pollDesc
        deadline   int64 // next deadline (nsec since 1970)
 }
 
+// A pollDesc contains netFD state related to pollServer.
+type pollDesc struct {
+       // immutable after Init()
+       pollServer *pollServer
+       sysfd      int
+       cr, cw     chan error
+
+       // mutable, protected by pollServer mutex
+       closing  bool
+       ncr, ncw int
+
+       // mutable, safe for concurrent access
+       rdeadline, wdeadline deadline
+}
+
 func newPollServer() (s *pollServer, err error) {
        s = new(pollServer)
        if s.pr, s.pw, err = os.Pipe(); err != nil {
@@ -51,7 +66,7 @@ func newPollServer() (s *pollServer, err error) {
                s.poll.Close()
                goto Error
        }
-       s.pending = make(map[int]*netFD)
+       s.pending = make(map[int]*pollDesc)
        go s.Run()
        return s, nil
 
@@ -67,10 +82,10 @@ Error:
        return nil, err
 }
 
-func (s *pollServer) AddFD(fd *netFD, mode int) error {
+func (s *pollServer) AddFD(pd *pollDesc, mode int) error {
        s.Lock()
-       intfd := fd.sysfd
-       if intfd < 0 || fd.closing {
+       intfd := pd.sysfd
+       if intfd < 0 || pd.closing {
                // fd closed underfoot
                s.Unlock()
                return errClosing
@@ -79,14 +94,14 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
        var t int64
        key := intfd << 1
        if mode == 'r' {
-               fd.ncr++
-               t = fd.rdeadline.value()
+               pd.ncr++
+               t = pd.rdeadline.value()
        } else {
-               fd.ncw++
+               pd.ncw++
                key++
-               t = fd.wdeadline.value()
+               t = pd.wdeadline.value()
        }
-       s.pending[key] = fd
+       s.pending[key] = pd
        doWakeup := false
        if t > 0 && (s.deadline == 0 || t < s.deadline) {
                s.deadline = t
@@ -96,7 +111,7 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
        wake, err := s.poll.AddFD(intfd, mode, false)
        s.Unlock()
        if err != nil {
-               return &OpError{"addfd", fd.net, fd.laddr, err}
+               return err
        }
        if wake || doWakeup {
                s.Wakeup()
@@ -104,25 +119,26 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
        return nil
 }
 
-// Evict evicts fd from the pending list, unblocking
-// any I/O running on fd.  The caller must have locked
+// Evict evicts pd from the pending list, unblocking
+// any I/O running on pd.  The caller must have locked
 // pollserver.
 // Return value is whether the pollServer should be woken up.
-func (s *pollServer) Evict(fd *netFD) bool {
+func (s *pollServer) Evict(pd *pollDesc) bool {
+       pd.closing = true
        doWakeup := false
-       if s.pending[fd.sysfd<<1] == fd {
-               s.WakeFD(fd, 'r', errClosing)
-               if s.poll.DelFD(fd.sysfd, 'r') {
+       if s.pending[pd.sysfd<<1] == pd {
+               s.WakeFD(pd, 'r', errClosing)
+               if s.poll.DelFD(pd.sysfd, 'r') {
                        doWakeup = true
                }
-               delete(s.pending, fd.sysfd<<1)
+               delete(s.pending, pd.sysfd<<1)
        }
-       if s.pending[fd.sysfd<<1|1] == fd {
-               s.WakeFD(fd, 'w', errClosing)
-               if s.poll.DelFD(fd.sysfd, 'w') {
+       if s.pending[pd.sysfd<<1|1] == pd {
+               s.WakeFD(pd, 'w', errClosing)
+               if s.poll.DelFD(pd.sysfd, 'w') {
                        doWakeup = true
                }
-               delete(s.pending, fd.sysfd<<1|1)
+               delete(s.pending, pd.sysfd<<1|1)
        }
        return doWakeup
 }
@@ -131,7 +147,7 @@ var wakeupbuf [1]byte
 
 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
 
-func (s *pollServer) LookupFD(fd int, mode int) *netFD {
+func (s *pollServer) LookupFD(fd int, mode int) *pollDesc {
        key := fd << 1
        if mode == 'w' {
                key++
@@ -144,16 +160,16 @@ func (s *pollServer) LookupFD(fd int, mode int) *netFD {
        return netfd
 }
 
-func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
+func (s *pollServer) WakeFD(pd *pollDesc, mode int, err error) {
        if mode == 'r' {
-               for fd.ncr > 0 {
-                       fd.ncr--
-                       fd.cr <- err
+               for pd.ncr > 0 {
+                       pd.ncr--
+                       pd.cr <- err
                }
        } else {
-               for fd.ncw > 0 {
-                       fd.ncw--
-                       fd.cw <- err
+               for pd.ncw > 0 {
+                       pd.ncw--
+                       pd.cw <- err
                }
        }
 }
@@ -164,7 +180,7 @@ func (s *pollServer) CheckDeadlines() {
        // probably with a heap indexed by wakeup time.
 
        var nextDeadline int64
-       for key, fd := range s.pending {
+       for key, pd := range s.pending {
                var t int64
                var mode int
                if key&1 == 0 {
@@ -173,15 +189,15 @@ func (s *pollServer) CheckDeadlines() {
                        mode = 'w'
                }
                if mode == 'r' {
-                       t = fd.rdeadline.value()
+                       t = pd.rdeadline.value()
                } else {
-                       t = fd.wdeadline.value()
+                       t = pd.wdeadline.value()
                }
                if t > 0 {
                        if t <= now {
                                delete(s.pending, key)
-                               s.poll.DelFD(fd.sysfd, mode)
-                               s.WakeFD(fd, mode, errTimeout)
+                               s.poll.DelFD(pd.sysfd, mode)
+                               s.WakeFD(pd, mode, errTimeout)
                        } else if nextDeadline == 0 || t < nextDeadline {
                                nextDeadline = t
                        }
@@ -220,48 +236,67 @@ func (s *pollServer) Run() {
                        s.pr.Read(scratch[0:])
                        s.CheckDeadlines()
                } else {
-                       netfd := s.LookupFD(fd, mode)
-                       if netfd == nil {
+                       pd := s.LookupFD(fd, mode)
+                       if pd == nil {
                                // This can happen because the WaitFD runs without
                                // holding s's lock, so there might be a pending wakeup
                                // for an fd that has been evicted.  No harm done.
                                continue
                        }
-                       s.WakeFD(netfd, mode, nil)
+                       s.WakeFD(pd, mode, nil)
                }
        }
 }
 
-func (s *pollServer) PrepareRead(fd *netFD) error {
-       if fd.rdeadline.expired() {
+func (pd *pollDesc) Close() {
+}
+
+func (pd *pollDesc) Lock() {
+       pd.pollServer.Lock()
+}
+
+func (pd *pollDesc) Unlock() {
+       pd.pollServer.Unlock()
+}
+
+func (pd *pollDesc) Wakeup() {
+       pd.pollServer.Wakeup()
+}
+
+func (pd *pollDesc) PrepareRead() error {
+       if pd.rdeadline.expired() {
                return errTimeout
        }
        return nil
 }
 
-func (s *pollServer) PrepareWrite(fd *netFD) error {
-       if fd.wdeadline.expired() {
+func (pd *pollDesc) PrepareWrite() error {
+       if pd.wdeadline.expired() {
                return errTimeout
        }
        return nil
 }
 
-func (s *pollServer) WaitRead(fd *netFD) error {
-       err := s.AddFD(fd, 'r')
+func (pd *pollDesc) WaitRead() error {
+       err := pd.pollServer.AddFD(pd, 'r')
        if err == nil {
-               err = <-fd.cr
+               err = <-pd.cr
        }
        return err
 }
 
-func (s *pollServer) WaitWrite(fd *netFD) error {
-       err := s.AddFD(fd, 'w')
+func (pd *pollDesc) WaitWrite() error {
+       err := pd.pollServer.AddFD(pd, 'w')
        if err == nil {
-               err = <-fd.cw
+               err = <-pd.cw
        }
        return err
 }
 
+func (pd *pollDesc) Evict() bool {
+       return pd.pollServer.Evict(pd)
+}
+
 // Spread network FDs over several pollServers.
 
 var pollMaxN int
@@ -292,31 +327,29 @@ func startServer(k int) {
        pollservers[k] = p
 }
 
-func pollServerInit(fd *netFD) error {
+func (pd *pollDesc) Init(fd *netFD) error {
        pollN := runtime.GOMAXPROCS(0)
        if pollN > pollMaxN {
                pollN = pollMaxN
        }
        k := fd.sysfd % pollN
        startServersOnce[k]()
-       fd.pollServer = pollservers[k]
-       fd.cr = make(chan error, 1)
-       fd.cw = make(chan error, 1)
+       pd.sysfd = fd.sysfd
+       pd.pollServer = pollservers[k]
+       pd.cr = make(chan error, 1)
+       pd.cw = make(chan error, 1)
        return nil
 }
 
-func (s *pollServer) Close(fd *netFD) {
-}
-
 // TODO(dfc) these unused error returns could be removed
 
 func setReadDeadline(fd *netFD, t time.Time) error {
-       fd.rdeadline.setTime(t)
+       fd.pd.rdeadline.setTime(t)
        return nil
 }
 
 func setWriteDeadline(fd *netFD, t time.Time) error {
-       fd.wdeadline.setTime(t)
+       fd.pd.wdeadline.setTime(t)
        return nil
 }
 
index 51269d8931c802b2ce59e95d70c38887a465efe3..5621927dc382e8812dd638f3d03910dada415224 100644 (file)
@@ -20,7 +20,7 @@ type netFD struct {
        sysmu  sync.Mutex
        sysref int
 
-       // must lock both sysmu and pollserver to write
+       // must lock both sysmu and pollDesc to write
        // can lock either to read
        closing bool
 
@@ -30,8 +30,6 @@ type netFD struct {
        sotype      int
        isConnected bool
        sysfile     *os.File
-       cr          chan error
-       cw          chan error
        net         string
        laddr       Addr
        raddr       Addr
@@ -39,14 +37,8 @@ type netFD struct {
        // serialize access to Read and Write methods
        rio, wio sync.Mutex
 
-       // read and write deadlines
-       rdeadline, wdeadline deadline
-
-       // owned by fd wait server
-       ncr, ncw int
-
        // wait server
-       pollServer *pollServer
+       pd pollDesc
 }
 
 func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) {
@@ -65,7 +57,7 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) {
                sotype: sotype,
                net:    net,
        }
-       if err := pollServerInit(netfd); err != nil {
+       if err := netfd.pd.Init(netfd); err != nil {
                return nil, err
        }
        return netfd, nil
@@ -91,12 +83,12 @@ func (fd *netFD) name() string {
 func (fd *netFD) connect(ra syscall.Sockaddr) error {
        fd.wio.Lock()
        defer fd.wio.Unlock()
-       if err := fd.pollServer.PrepareWrite(fd); err != nil {
+       if err := fd.pd.PrepareWrite(); err != nil {
                return err
        }
        err := syscall.Connect(fd.sysfd, ra)
        if err == syscall.EINPROGRESS {
-               if err = fd.pollServer.WaitWrite(fd); err != nil {
+               if err = fd.pd.WaitWrite(); err != nil {
                        return err
                }
                var e int
@@ -112,7 +104,7 @@ func (fd *netFD) connect(ra syscall.Sockaddr) error {
 }
 
 // Add a reference to this fd.
-// If closing==true, pollserver must be locked; mark the fd as closing.
+// If closing==true, pollDesc must be locked; mark the fd as closing.
 // Returns an error if the fd cannot be used.
 func (fd *netFD) incref(closing bool) error {
        fd.sysmu.Lock()
@@ -135,7 +127,7 @@ func (fd *netFD) decref() {
        fd.sysref--
        if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
                fd.sysfile.Close()
-               fd.pollServer.Close(fd)
+               fd.pd.Close()
                fd.sysfile = nil
                fd.sysfd = -1
        }
@@ -143,21 +135,21 @@ func (fd *netFD) decref() {
 }
 
 func (fd *netFD) Close() error {
-       fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict
+       fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict
        if err := fd.incref(true); err != nil {
-               fd.pollServer.Unlock()
+               fd.pd.Unlock()
                return err
        }
        // Unblock any I/O.  Once it all unblocks and returns,
        // so that it cannot be referring to fd.sysfd anymore,
        // the final decref will close fd.sysfd.  This should happen
        // fairly quickly, since all the I/O is non-blocking, and any
-       // attempts to block in the pollserver will return errClosing.
-       doWakeup := fd.pollServer.Evict(fd)
-       fd.pollServer.Unlock()
+       // attempts to block in the pollDesc will return errClosing.
+       doWakeup := fd.pd.Evict()
+       fd.pd.Unlock()
        fd.decref()
        if doWakeup {
-               fd.pollServer.Wakeup()
+               fd.pd.Wakeup()
        }
        return nil
 }
@@ -189,7 +181,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
                return 0, err
        }
        defer fd.decref()
-       if err := fd.pollServer.PrepareRead(fd); err != nil {
+       if err := fd.pd.PrepareRead(); err != nil {
                return 0, &OpError{"read", fd.net, fd.raddr, err}
        }
        for {
@@ -197,7 +189,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
                if err != nil {
                        n = 0
                        if err == syscall.EAGAIN {
-                               if err = fd.pollServer.WaitRead(fd); err == nil {
+                               if err = fd.pd.WaitRead(); err == nil {
                                        continue
                                }
                        }
@@ -218,7 +210,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
                return 0, nil, err
        }
        defer fd.decref()
-       if err := fd.pollServer.PrepareRead(fd); err != nil {
+       if err := fd.pd.PrepareRead(); err != nil {
                return 0, nil, &OpError{"read", fd.net, fd.laddr, err}
        }
        for {
@@ -226,7 +218,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
                if err != nil {
                        n = 0
                        if err == syscall.EAGAIN {
-                               if err = fd.pollServer.WaitRead(fd); err == nil {
+                               if err = fd.pd.WaitRead(); err == nil {
                                        continue
                                }
                        }
@@ -247,7 +239,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
                return 0, 0, 0, nil, err
        }
        defer fd.decref()
-       if err := fd.pollServer.PrepareRead(fd); err != nil {
+       if err := fd.pd.PrepareRead(); err != nil {
                return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err}
        }
        for {
@@ -255,7 +247,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
                if err != nil {
                        // TODO(dfc) should n and oobn be set to 0
                        if err == syscall.EAGAIN {
-                               if err = fd.pollServer.WaitRead(fd); err == nil {
+                               if err = fd.pd.WaitRead(); err == nil {
                                        continue
                                }
                        }
@@ -283,7 +275,7 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
                return 0, err
        }
        defer fd.decref()
-       if err := fd.pollServer.PrepareWrite(fd); err != nil {
+       if err := fd.pd.PrepareWrite(); err != nil {
                return 0, &OpError{"write", fd.net, fd.raddr, err}
        }
        for {
@@ -296,7 +288,7 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
                        break
                }
                if err == syscall.EAGAIN {
-                       if err = fd.pollServer.WaitWrite(fd); err == nil {
+                       if err = fd.pd.WaitWrite(); err == nil {
                                continue
                        }
                }
@@ -322,13 +314,13 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
                return 0, err
        }
        defer fd.decref()
-       if err := fd.pollServer.PrepareWrite(fd); err != nil {
+       if err := fd.pd.PrepareWrite(); err != nil {
                return 0, &OpError{"write", fd.net, fd.raddr, err}
        }
        for {
                err = syscall.Sendto(fd.sysfd, p, 0, sa)
                if err == syscall.EAGAIN {
-                       if err = fd.pollServer.WaitWrite(fd); err == nil {
+                       if err = fd.pd.WaitWrite(); err == nil {
                                continue
                        }
                }
@@ -349,13 +341,13 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
                return 0, 0, err
        }
        defer fd.decref()
-       if err := fd.pollServer.PrepareWrite(fd); err != nil {
+       if err := fd.pd.PrepareWrite(); err != nil {
                return 0, 0, &OpError{"write", fd.net, fd.raddr, err}
        }
        for {
                err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
                if err == syscall.EAGAIN {
-                       if err = fd.pollServer.WaitWrite(fd); err == nil {
+                       if err = fd.pd.WaitWrite(); err == nil {
                                continue
                        }
                }
@@ -380,14 +372,14 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e
 
        var s int
        var rsa syscall.Sockaddr
-       if err = fd.pollServer.PrepareRead(fd); err != nil {
+       if err = fd.pd.PrepareRead(); err != nil {
                return nil, &OpError{"accept", fd.net, fd.laddr, err}
        }
        for {
                s, rsa, err = accept(fd.sysfd)
                if err != nil {
                        if err == syscall.EAGAIN {
-                               if err = fd.pollServer.WaitRead(fd); err == nil {
+                               if err = fd.pd.WaitRead(); err == nil {
                                        continue
                                }
                        } else if err == syscall.ECONNABORTED {
index 8008bc3b56047d0ef3433bd7a150e4742dda9e3a..dc5b767557b5b166c012ef3648cfce2dd41e4a53 100644 (file)
@@ -83,7 +83,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
                        break
                }
                if err1 == syscall.EAGAIN {
-                       if err1 = c.pollServer.WaitWrite(c); err1 == nil {
+                       if err1 = c.pd.WaitWrite(); err1 == nil {
                                continue
                        }
                }
index 3357e653869a59b4860c101c5b0dd32772912d0a..6f1323b3dcdc234abb3103cb7b1555603ad1f47f 100644 (file)
@@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
                        break
                }
                if err1 == syscall.EAGAIN {
-                       if err1 = c.pollServer.WaitWrite(c); err1 == nil {
+                       if err1 = c.pd.WaitWrite(); err1 == nil {
                                continue
                        }
                }