]> Cypherpunks repositories - gostls13.git/commitdiff
net: add special netFD mutex
authorDmitriy Vyukov <dvyukov@google.com>
Fri, 9 Aug 2013 17:43:00 +0000 (21:43 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Fri, 9 Aug 2013 17:43:00 +0000 (21:43 +0400)
The mutex, fdMutex, handles locking and lifetime of sysfd,
and serializes Read and Write methods.
This allows to strip 2 sync.Mutex.Lock calls,
2 sync.Mutex.Unlock calls, 1 defer and some amount
of misc overhead from every network operation.

On linux/amd64, Intel E5-2690:
benchmark                             old ns/op    new ns/op    delta
BenchmarkTCP4Persistent                    9595         9454   -1.47%
BenchmarkTCP4Persistent-2                  8978         8772   -2.29%
BenchmarkTCP4ConcurrentReadWrite           4900         4625   -5.61%
BenchmarkTCP4ConcurrentReadWrite-2         2603         2500   -3.96%

In general it strips 70-500 ns from every network operation depending
on processor model. On my relatively new E5-2690 it accounts to ~5%
of network op cost.

Fixes #6074.

R=golang-dev, bradfitz, alex.brainman, iant, mikioh.mikioh
CC=golang-dev
https://golang.org/cl/12418043

25 files changed:
src/pkg/net/fd_mutex.go [new file with mode: 0644]
src/pkg/net/fd_mutex_test.go [new file with mode: 0644]
src/pkg/net/fd_poll_runtime.go
src/pkg/net/fd_unix.go
src/pkg/net/fd_windows.go
src/pkg/net/sendfile_freebsd.go
src/pkg/net/sendfile_linux.go
src/pkg/net/sendfile_windows.go
src/pkg/net/sockopt_posix.go
src/pkg/net/sockoptip_bsd.go
src/pkg/net/sockoptip_linux.go
src/pkg/net/sockoptip_posix.go
src/pkg/net/sockoptip_windows.go
src/pkg/net/tcpsockopt_darwin.go
src/pkg/net/tcpsockopt_openbsd.go
src/pkg/net/tcpsockopt_posix.go
src/pkg/net/tcpsockopt_unix.go
src/pkg/net/tcpsockopt_windows.go
src/pkg/runtime/mgc0.c
src/pkg/runtime/mprof.goc
src/pkg/runtime/netpoll.goc
src/pkg/runtime/proc.c
src/pkg/runtime/race.c
src/pkg/runtime/runtime.h
src/pkg/runtime/sema.goc

diff --git a/src/pkg/net/fd_mutex.go b/src/pkg/net/fd_mutex.go
new file mode 100644 (file)
index 0000000..1caf974
--- /dev/null
@@ -0,0 +1,184 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import "sync/atomic"
+
+// fdMutex is a specialized synchronization primitive
+// that manages lifetime of an fd and serializes access
+// to Read and Write methods on netFD.
+type fdMutex struct {
+       state uint64
+       rsema uint32
+       wsema uint32
+}
+
+// fdMutex.state is organized as follows:
+// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
+// 1 bit - lock for read operations.
+// 1 bit - lock for write operations.
+// 20 bits - total number of references (read+write+misc).
+// 20 bits - number of outstanding read waiters.
+// 20 bits - number of outstanding write waiters.
+const (
+       mutexClosed  = 1 << 0
+       mutexRLock   = 1 << 1
+       mutexWLock   = 1 << 2
+       mutexRef     = 1 << 3
+       mutexRefMask = (1<<20 - 1) << 3
+       mutexRWait   = 1 << 23
+       mutexRMask   = (1<<20 - 1) << 23
+       mutexWWait   = 1 << 43
+       mutexWMask   = (1<<20 - 1) << 43
+)
+
+// Read operations must do RWLock(true)/RWUnlock(true).
+// Write operations must do RWLock(false)/RWUnlock(false).
+// Misc operations must do Incref/Decref. Misc operations include functions like
+// setsockopt and setDeadline. They need to use Incref/Decref to ensure that
+// they operate on the correct fd in presence of a concurrent Close call
+// (otherwise fd can be closed under their feet).
+// Close operation must do IncrefAndClose/Decref.
+
+// RWLock/Incref return whether fd is open.
+// RWUnlock/Decref return whether fd is closed and there are no remaining references.
+
+func (mu *fdMutex) Incref() bool {
+       for {
+               old := atomic.LoadUint64(&mu.state)
+               if old&mutexClosed != 0 {
+                       return false
+               }
+               new := old + mutexRef
+               if new&mutexRefMask == 0 {
+                       panic("net: inconsistent fdMutex")
+               }
+               if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+                       return true
+               }
+       }
+}
+
+func (mu *fdMutex) IncrefAndClose() bool {
+       for {
+               old := atomic.LoadUint64(&mu.state)
+               if old&mutexClosed != 0 {
+                       return false
+               }
+               // Mark as closed and acquire a reference.
+               new := (old | mutexClosed) + mutexRef
+               if new&mutexRefMask == 0 {
+                       panic("net: inconsistent fdMutex")
+               }
+               // Remove all read and write waiters.
+               new &^= mutexRMask | mutexWMask
+               if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+                       // Wake all read and write waiters,
+                       // they will observe closed flag after wakeup.
+                       for old&mutexRMask != 0 {
+                               old -= mutexRWait
+                               runtime_Semrelease(&mu.rsema)
+                       }
+                       for old&mutexWMask != 0 {
+                               old -= mutexWWait
+                               runtime_Semrelease(&mu.wsema)
+                       }
+                       return true
+               }
+       }
+}
+
+func (mu *fdMutex) Decref() bool {
+       for {
+               old := atomic.LoadUint64(&mu.state)
+               if old&mutexRefMask == 0 {
+                       panic("net: inconsistent fdMutex")
+               }
+               new := old - mutexRef
+               if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+                       return new&(mutexClosed|mutexRef) == mutexClosed
+               }
+       }
+}
+
+func (mu *fdMutex) RWLock(read bool) bool {
+       var mutexBit, mutexWait, mutexMask uint64
+       var mutexSema *uint32
+       if read {
+               mutexBit = mutexRLock
+               mutexWait = mutexRWait
+               mutexMask = mutexRMask
+               mutexSema = &mu.rsema
+       } else {
+               mutexBit = mutexWLock
+               mutexWait = mutexWWait
+               mutexMask = mutexWMask
+               mutexSema = &mu.wsema
+       }
+       for {
+               old := atomic.LoadUint64(&mu.state)
+               if old&mutexClosed != 0 {
+                       return false
+               }
+               var new uint64
+               if old&mutexBit == 0 {
+                       // Lock is free, acquire it.
+                       new = (old | mutexBit) + mutexRef
+                       if new&mutexRefMask == 0 {
+                               panic("net: inconsistent fdMutex")
+                       }
+               } else {
+                       // Wait for lock.
+                       new = old + mutexWait
+                       if new&mutexMask == 0 {
+                               panic("net: inconsistent fdMutex")
+                       }
+               }
+               if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+                       if old&mutexBit == 0 {
+                               return true
+                       }
+                       runtime_Semacquire(mutexSema)
+                       // The signaller has subtracted mutexWait.
+               }
+       }
+}
+
+func (mu *fdMutex) RWUnlock(read bool) bool {
+       var mutexBit, mutexWait, mutexMask uint64
+       var mutexSema *uint32
+       if read {
+               mutexBit = mutexRLock
+               mutexWait = mutexRWait
+               mutexMask = mutexRMask
+               mutexSema = &mu.rsema
+       } else {
+               mutexBit = mutexWLock
+               mutexWait = mutexWWait
+               mutexMask = mutexWMask
+               mutexSema = &mu.wsema
+       }
+       for {
+               old := atomic.LoadUint64(&mu.state)
+               if old&mutexBit == 0 || old&mutexRefMask == 0 {
+                       panic("net: inconsistent fdMutex")
+               }
+               // Drop lock, drop reference and wake read waiter if present.
+               new := (old &^ mutexBit) - mutexRef
+               if old&mutexMask != 0 {
+                       new -= mutexWait
+               }
+               if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+                       if old&mutexMask != 0 {
+                               runtime_Semrelease(mutexSema)
+                       }
+                       return new&(mutexClosed|mutexRef) == mutexClosed
+               }
+       }
+}
+
+// Implemented in runtime package.
+func runtime_Semacquire(sema *uint32)
+func runtime_Semrelease(sema *uint32)
diff --git a/src/pkg/net/fd_mutex_test.go b/src/pkg/net/fd_mutex_test.go
new file mode 100644 (file)
index 0000000..8383084
--- /dev/null
@@ -0,0 +1,186 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+       "math/rand"
+       "runtime"
+       "testing"
+       "time"
+)
+
+func TestMutexLock(t *testing.T) {
+       var mu fdMutex
+
+       if !mu.Incref() {
+               t.Fatal("broken")
+       }
+       if mu.Decref() {
+               t.Fatal("broken")
+       }
+
+       if !mu.RWLock(true) {
+               t.Fatal("broken")
+       }
+       if mu.RWUnlock(true) {
+               t.Fatal("broken")
+       }
+
+       if !mu.RWLock(false) {
+               t.Fatal("broken")
+       }
+       if mu.RWUnlock(false) {
+               t.Fatal("broken")
+       }
+}
+
+func TestMutexClose(t *testing.T) {
+       var mu fdMutex
+       if !mu.IncrefAndClose() {
+               t.Fatal("broken")
+       }
+
+       if mu.Incref() {
+               t.Fatal("broken")
+       }
+       if mu.RWLock(true) {
+               t.Fatal("broken")
+       }
+       if mu.RWLock(false) {
+               t.Fatal("broken")
+       }
+       if mu.IncrefAndClose() {
+               t.Fatal("broken")
+       }
+}
+
+func TestMutexCloseUnblock(t *testing.T) {
+       c := make(chan bool)
+       var mu fdMutex
+       mu.RWLock(true)
+       for i := 0; i < 4; i++ {
+               go func() {
+                       if mu.RWLock(true) {
+                               t.Fatal("broken")
+                       }
+                       c <- true
+               }()
+       }
+       // Concurrent goroutines must not be able to read lock the mutex.
+       time.Sleep(time.Millisecond)
+       select {
+       case <-c:
+               t.Fatal("broken")
+       default:
+       }
+       mu.IncrefAndClose() // Must unblock the readers.
+       for i := 0; i < 4; i++ {
+               select {
+               case <-c:
+               case <-time.After(10 * time.Second):
+                       t.Fatal("broken")
+               }
+       }
+       if mu.Decref() {
+               t.Fatal("broken")
+       }
+       if !mu.RWUnlock(true) {
+               t.Fatal("broken")
+       }
+}
+
+func TestMutexPanic(t *testing.T) {
+       ensurePanics := func(f func()) {
+               defer func() {
+                       if recover() == nil {
+                               t.Fatal("does not panic")
+                       }
+               }()
+               f()
+       }
+
+       var mu fdMutex
+       ensurePanics(func() { mu.Decref() })
+       ensurePanics(func() { mu.RWUnlock(true) })
+       ensurePanics(func() { mu.RWUnlock(false) })
+
+       ensurePanics(func() { mu.Incref(); mu.Decref(); mu.Decref() })
+       ensurePanics(func() { mu.RWLock(true); mu.RWUnlock(true); mu.RWUnlock(true) })
+       ensurePanics(func() { mu.RWLock(false); mu.RWUnlock(false); mu.RWUnlock(false) })
+
+       // ensure that it's still not broken
+       mu.Incref()
+       mu.Decref()
+       mu.RWLock(true)
+       mu.RWUnlock(true)
+       mu.RWLock(false)
+       mu.RWUnlock(false)
+}
+
+func TestMutexStress(t *testing.T) {
+       P := 8
+       N := int(1e6)
+       if testing.Short() {
+               P = 4
+               N = 1e4
+       }
+       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
+       done := make(chan bool)
+       var mu fdMutex
+       var readState [2]uint64
+       var writeState [2]uint64
+       for p := 0; p < P; p++ {
+               go func() {
+                       r := rand.New(rand.NewSource(rand.Int63()))
+                       for i := 0; i < N; i++ {
+                               switch r.Intn(3) {
+                               case 0:
+                                       if !mu.Incref() {
+                                               t.Fatal("broken")
+                                       }
+                                       if mu.Decref() {
+                                               t.Fatal("broken")
+                                       }
+                               case 1:
+                                       if !mu.RWLock(true) {
+                                               t.Fatal("broken")
+                                       }
+                                       // Ensure that it provides mutual exclusion for readers.
+                                       if readState[0] != readState[1] {
+                                               t.Fatal("broken")
+                                       }
+                                       readState[0]++
+                                       readState[1]++
+                                       if mu.RWUnlock(true) {
+                                               t.Fatal("broken")
+                                       }
+                               case 2:
+                                       if !mu.RWLock(false) {
+                                               t.Fatal("broken")
+                                       }
+                                       // Ensure that it provides mutual exclusion for writers.
+                                       if writeState[0] != writeState[1] {
+                                               t.Fatal("broken")
+                                       }
+                                       writeState[0]++
+                                       writeState[1]++
+                                       if mu.RWUnlock(false) {
+                                               t.Fatal("broken")
+                                       }
+                               }
+                       }
+                       done <- true
+               }()
+       }
+       for p := 0; p < P; p++ {
+               <-done
+       }
+       if !mu.IncrefAndClose() {
+               t.Fatal("broken")
+       }
+       if !mu.Decref() {
+               t.Fatal("broken")
+       }
+}
index 6ae5c609ac4e32bf2464d0d09ffb71a92904dc94..03474cf2c37d1d2acc4f498ab0dceabb143d55dc 100644 (file)
@@ -132,7 +132,7 @@ func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
        if t.IsZero() {
                d = 0
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
index a2a771491e475a33750b9aacafeae9af7bfb4634..f704c0a2a0a61d850ec9850be1502ab06ebcce7d 100644 (file)
@@ -10,7 +10,6 @@ import (
        "io"
        "os"
        "runtime"
-       "sync"
        "sync/atomic"
        "syscall"
        "time"
@@ -18,13 +17,8 @@ import (
 
 // Network file descriptor.
 type netFD struct {
-       // locking/lifetime of sysfd
-       sysmu  sync.Mutex
-       sysref int
-
-       // must lock both sysmu and pollDesc to write
-       // can lock either to read
-       closing bool
+       // locking/lifetime of sysfd + serialize access to Read and Write methods
+       fdmu fdMutex
 
        // immutable until Close
        sysfd       int
@@ -35,9 +29,6 @@ type netFD struct {
        laddr       Addr
        raddr       Addr
 
-       // serialize access to Read and Write methods
-       rio, wio sync.Mutex
-
        // wait server
        pd pollDesc
 }
@@ -84,8 +75,9 @@ func (fd *netFD) name() string {
 }
 
 func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
+       // Do not need to call fd.writeLock here,
+       // because fd is not yet accessible to user,
+       // so no concurrent operations are possible.
        if err := fd.pd.PrepareWrite(); err != nil {
                return err
        }
@@ -104,44 +96,69 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
        return nil
 }
 
+func (fd *netFD) destroy() {
+       // Poller may want to unregister fd in readiness notification mechanism,
+       // so this must be executed before closesocket.
+       fd.pd.Close()
+       closesocket(fd.sysfd)
+       fd.sysfd = -1
+       runtime.SetFinalizer(fd, nil)
+}
+
 // Add a reference to this fd.
-// If closing==true, pollDesc must be locked; mark the fd as closing.
 // Returns an error if the fd cannot be used.
-func (fd *netFD) incref(closing bool) error {
-       fd.sysmu.Lock()
-       if fd.closing {
-               fd.sysmu.Unlock()
+func (fd *netFD) incref() error {
+       if !fd.fdmu.Incref() {
                return errClosing
        }
-       fd.sysref++
-       if closing {
-               fd.closing = true
-       }
-       fd.sysmu.Unlock()
        return nil
 }
 
-// Remove a reference to this FD and close if we've been asked to do so (and
-// there are no references left.
+// Remove a reference to this FD and close if we've been asked to do so
+// (and there are no references left).
 func (fd *netFD) decref() {
-       fd.sysmu.Lock()
-       fd.sysref--
-       if fd.closing && fd.sysref == 0 {
-               // Poller may want to unregister fd in readiness notification mechanism,
-               // so this must be executed before closesocket.
-               fd.pd.Close()
-               closesocket(fd.sysfd)
-               fd.sysfd = -1
-               runtime.SetFinalizer(fd, nil)
-       }
-       fd.sysmu.Unlock()
+       if fd.fdmu.Decref() {
+               fd.destroy()
+       }
+}
+
+// Add a reference to this fd and lock for reading.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) readLock() error {
+       if !fd.fdmu.RWLock(true) {
+               return errClosing
+       }
+       return nil
+}
+
+// Unlock for reading and remove a reference to this FD.
+func (fd *netFD) readUnlock() {
+       if fd.fdmu.RWUnlock(true) {
+               fd.destroy()
+       }
+}
+
+// Add a reference to this fd and lock for writing.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) writeLock() error {
+       if !fd.fdmu.RWLock(false) {
+               return errClosing
+       }
+       return nil
+}
+
+// Unlock for writing and remove a reference to this FD.
+func (fd *netFD) writeUnlock() {
+       if fd.fdmu.RWUnlock(false) {
+               fd.destroy()
+       }
 }
 
 func (fd *netFD) Close() error {
        fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict
-       if err := fd.incref(true); err != nil {
+       if !fd.fdmu.IncrefAndClose() {
                fd.pd.Unlock()
-               return err
+               return errClosing
        }
        // Unblock any I/O.  Once it all unblocks and returns,
        // so that it cannot be referring to fd.sysfd anymore,
@@ -158,7 +175,7 @@ func (fd *netFD) Close() error {
 }
 
 func (fd *netFD) shutdown(how int) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -178,12 +195,10 @@ func (fd *netFD) CloseWrite() error {
 }
 
 func (fd *netFD) Read(p []byte) (n int, err error) {
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return 0, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
        if err := fd.pd.PrepareRead(); err != nil {
                return 0, &OpError{"read", fd.net, fd.raddr, err}
        }
@@ -207,12 +222,10 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
 }
 
 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return 0, nil, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
        if err := fd.pd.PrepareRead(); err != nil {
                return 0, nil, &OpError{"read", fd.net, fd.laddr, err}
        }
@@ -236,12 +249,10 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
 }
 
 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return 0, 0, 0, nil, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
        if err := fd.pd.PrepareRead(); err != nil {
                return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err}
        }
@@ -272,12 +283,10 @@ func chkReadErr(n int, err error, fd *netFD) error {
 }
 
 func (fd *netFD) Write(p []byte) (nn int, err error) {
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.writeLock(); err != nil {
                return 0, err
        }
-       defer fd.decref()
+       defer fd.writeUnlock()
        if err := fd.pd.PrepareWrite(); err != nil {
                return 0, &OpError{"write", fd.net, fd.raddr, err}
        }
@@ -311,12 +320,10 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
 }
 
 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.writeLock(); err != nil {
                return 0, err
        }
-       defer fd.decref()
+       defer fd.writeUnlock()
        if err := fd.pd.PrepareWrite(); err != nil {
                return 0, &OpError{"write", fd.net, fd.raddr, err}
        }
@@ -338,12 +345,10 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
 }
 
 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.writeLock(); err != nil {
                return 0, 0, err
        }
-       defer fd.decref()
+       defer fd.writeUnlock()
        if err := fd.pd.PrepareWrite(); err != nil {
                return 0, 0, &OpError{"write", fd.net, fd.raddr, err}
        }
@@ -366,12 +371,10 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
 }
 
 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return nil, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
 
        var s int
        var rsa syscall.Sockaddr
index ff3966e433477d9a464ceff8b88334e66e06cfe3..ff0190240b9566cb7c9484da5ca641158258e0a4 100644 (file)
@@ -105,7 +105,6 @@ type operation struct {
        qty        uint32
 
        // fields used only by net package
-       mu     sync.Mutex
        fd     *netFD
        errc   chan error
        buf    syscall.WSABuf
@@ -246,10 +245,8 @@ func startServer() {
 
 // Network file descriptor.
 type netFD struct {
-       // locking/lifetime of sysfd
-       sysmu   sync.Mutex
-       sysref  int
-       closing bool
+       // locking/lifetime of sysfd + serialize access to Read and Write methods
+       fdmu fdMutex
 
        // immutable until Close
        sysfd         syscall.Handle
@@ -313,6 +310,9 @@ func (fd *netFD) setAddr(laddr, raddr Addr) {
 }
 
 func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
+       // Do not need to call fd.writeLock here,
+       // because fd is not yet accessible to user,
+       // so no concurrent operations are possible.
        if !canUseConnectEx(fd.net) {
                return syscall.Connect(fd.sysfd, ra)
        }
@@ -332,8 +332,6 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
        }
        // Call ConnectEx API.
        o := &fd.wop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.sa = ra
        _, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
                return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
@@ -345,64 +343,80 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
        return syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
 }
 
+func (fd *netFD) destroy() {
+       if fd.sysfd == syscall.InvalidHandle {
+               return
+       }
+       // Poller may want to unregister fd in readiness notification mechanism,
+       // so this must be executed before closesocket.
+       fd.pd.Close()
+       closesocket(fd.sysfd)
+       fd.sysfd = syscall.InvalidHandle
+       // no need for a finalizer anymore
+       runtime.SetFinalizer(fd, nil)
+}
+
 // Add a reference to this fd.
-// If closing==true, mark the fd as closing.
 // Returns an error if the fd cannot be used.
-func (fd *netFD) incref(closing bool) error {
-       if fd == nil {
+func (fd *netFD) incref() error {
+       if !fd.fdmu.Incref() {
                return errClosing
        }
-       fd.sysmu.Lock()
-       if fd.closing {
-               fd.sysmu.Unlock()
-               return errClosing
+       return nil
+}
+
+// Remove a reference to this FD and close if we've been asked to do so
+// (and there are no references left).
+func (fd *netFD) decref() {
+       if fd.fdmu.Decref() {
+               fd.destroy()
        }
-       fd.sysref++
-       if closing {
-               fd.closing = true
+}
+
+// Add a reference to this fd and lock for reading.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) readLock() error {
+       if !fd.fdmu.RWLock(true) {
+               return errClosing
        }
-       closing = fd.closing
-       fd.sysmu.Unlock()
        return nil
 }
 
-// Remove a reference to this FD and close if we've been asked to do so (and
-// there are no references left.
-func (fd *netFD) decref() {
-       if fd == nil {
-               return
+// Unlock for reading and remove a reference to this FD.
+func (fd *netFD) readUnlock() {
+       if fd.fdmu.RWUnlock(true) {
+               fd.destroy()
        }
-       fd.sysmu.Lock()
-       fd.sysref--
-       if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
-               // Poller may want to unregister fd in readiness notification mechanism,
-               // so this must be executed before closesocket.
-               fd.pd.Close()
-               closesocket(fd.sysfd)
-               fd.sysfd = syscall.InvalidHandle
-               // no need for a finalizer anymore
-               runtime.SetFinalizer(fd, nil)
+}
+
+// Add a reference to this fd and lock for writing.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) writeLock() error {
+       if !fd.fdmu.RWLock(false) {
+               return errClosing
+       }
+       return nil
+}
+
+// Unlock for writing and remove a reference to this FD.
+func (fd *netFD) writeUnlock() {
+       if fd.fdmu.RWUnlock(false) {
+               fd.destroy()
        }
-       fd.sysmu.Unlock()
 }
 
 func (fd *netFD) Close() error {
-       if err := fd.incref(true); err != nil {
-               return err
+       if !fd.fdmu.IncrefAndClose() {
+               return errClosing
        }
-       defer fd.decref()
        // unblock pending reader and writer
        fd.pd.Evict()
-       // wait for both reader and writer to exit
-       fd.rop.mu.Lock()
-       fd.wop.mu.Lock()
-       fd.rop.mu.Unlock()
-       fd.wop.mu.Unlock()
+       fd.decref()
        return nil
 }
 
 func (fd *netFD) shutdown(how int) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -422,13 +436,11 @@ func (fd *netFD) CloseWrite() error {
 }
 
 func (fd *netFD) Read(buf []byte) (int, error) {
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return 0, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
        o := &fd.rop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.InitBuf(buf)
        n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
                return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
@@ -443,13 +455,11 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
        if len(buf) == 0 {
                return 0, nil, nil
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return 0, nil, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
        o := &fd.rop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.InitBuf(buf)
        n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
                if o.rsa == nil {
@@ -466,13 +476,11 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
 }
 
 func (fd *netFD) Write(buf []byte) (int, error) {
-       if err := fd.incref(false); err != nil {
+       if err := fd.writeLock(); err != nil {
                return 0, err
        }
-       defer fd.decref()
+       defer fd.writeUnlock()
        o := &fd.wop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.InitBuf(buf)
        return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
                return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
@@ -483,13 +491,11 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
        if len(buf) == 0 {
                return 0, nil
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.writeLock(); err != nil {
                return 0, err
        }
-       defer fd.decref()
+       defer fd.writeUnlock()
        o := &fd.wop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.InitBuf(buf)
        o.sa = sa
        return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
@@ -498,10 +504,10 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
 }
 
 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
-       if err := fd.incref(false); err != nil {
+       if err := fd.readLock(); err != nil {
                return nil, err
        }
-       defer fd.decref()
+       defer fd.readUnlock()
 
        // Get new socket.
        s, err := sysSocket(fd.family, fd.sotype, 0)
@@ -522,8 +528,6 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
 
        // Submit accept request.
        o := &fd.rop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.handle = s
        var rawsa [2]syscall.RawSockaddrAny
        o.rsan = int32(unsafe.Sizeof(rawsa[0]))
index dc5b767557b5b166c012ef3648cfce2dd41e4a53..42fe799efbd0e68e5cc54fd50ee480bba652cb03 100644 (file)
@@ -58,12 +58,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
                return 0, err, false
        }
 
-       c.wio.Lock()
-       defer c.wio.Unlock()
-       if err := c.incref(false); err != nil {
+       if err := c.writeLock(); err != nil {
                return 0, err, true
        }
-       defer c.decref()
+       defer c.writeUnlock()
 
        dst := c.sysfd
        src := int(f.Fd())
index 6f1323b3dcdc234abb3103cb7b1555603ad1f47f..5e117636a8076d3fe93f041407faa02d8cef8945 100644 (file)
@@ -36,12 +36,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
                return 0, nil, false
        }
 
-       c.wio.Lock()
-       defer c.wio.Unlock()
-       if err := c.incref(false); err != nil {
+       if err := c.writeLock(); err != nil {
                return 0, err, true
        }
-       defer c.decref()
+       defer c.writeUnlock()
 
        dst := c.sysfd
        src := int(f.Fd())
index e9b9f91da54aaea646a214528d0835b9bd5e4c8e..0107f679b3822595f20af43a41f4e78b906a461b 100644 (file)
@@ -34,13 +34,12 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
                return 0, nil, false
        }
 
-       if err := fd.incref(false); err != nil {
+       if err := fd.writeLock(); err != nil {
                return 0, err, true
        }
-       defer fd.decref()
+       defer fd.writeUnlock()
+
        o := &fd.wop
-       o.mu.Lock()
-       defer o.mu.Unlock()
        o.qty = uint32(n)
        o.handle = syscall.Handle(f.Fd())
        done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
index 886afc2c75e52323c643dbb45ec58879d2985742..da2742c9a4ab0df943a32b9ec102552ab2c28347 100644 (file)
@@ -101,7 +101,7 @@ done:
 }
 
 func setReadBuffer(fd *netFD, bytes int) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -109,7 +109,7 @@ func setReadBuffer(fd *netFD, bytes int) error {
 }
 
 func setWriteBuffer(fd *netFD, bytes int) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -117,7 +117,7 @@ func setWriteBuffer(fd *netFD, bytes int) error {
 }
 
 func setKeepAlive(fd *netFD, keepalive bool) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -133,7 +133,7 @@ func setLinger(fd *netFD, sec int) error {
                l.Onoff = 0
                l.Linger = 0
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index bcae43c31d4f9c5585e747dc95120dfa3563b347..ca080fd7e4da3c6d57fbb4ce1bb89e5da90ecf20 100644 (file)
@@ -18,7 +18,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
        }
        var a [4]byte
        copy(a[:], ip.To4())
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -26,7 +26,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
 }
 
 func setIPv4MulticastLoopback(fd *netFD, v bool) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index f9cf938d70ca36889ca919b04545f494561caf7b..a69b778e4d121a28386afb8585eca46ffe06779e 100644 (file)
@@ -15,7 +15,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
                v = int32(ifi.Index)
        }
        mreq := &syscall.IPMreqn{Ifindex: v}
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -23,7 +23,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
 }
 
 func setIPv4MulticastLoopback(fd *netFD, v bool) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index c82eef0f5f5d91efea1bf50d62f5dd3260a80fa8..5c2a5872f490c42781c32365696a387e21152162 100644 (file)
@@ -16,7 +16,7 @@ func joinIPv4Group(fd *netFD, ifi *Interface, ip IP) error {
        if err := setIPv4MreqToInterface(mreq, ifi); err != nil {
                return err
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -28,7 +28,7 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error {
        if ifi != nil {
                v = ifi.Index
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -36,7 +36,7 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error {
 }
 
 func setIPv6MulticastLoopback(fd *netFD, v bool) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -49,7 +49,7 @@ func joinIPv6Group(fd *netFD, ifi *Interface, ip IP) error {
        if ifi != nil {
                mreq.Interface = uint32(ifi.Index)
        }
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index fbaf0ed6f45bd15697fce5b8837918f75f92c0ee..7b11f207aaf3b83df0dbb29cebd53ca6263442e9 100644 (file)
@@ -17,7 +17,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
        }
        var a [4]byte
        copy(a[:], ip.To4())
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
@@ -25,7 +25,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
 }
 
 func setIPv4MulticastLoopback(fd *netFD, v bool) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index d052a140d74364ed6bf0e482283d2f8f5c81fc2a..33140849c95c56738b1a62ac515b8ab3d3314653 100644 (file)
@@ -14,7 +14,7 @@ import (
 
 // Set keep alive period.
 func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index 306f4e050d278f9bddebed1165d859cb7f3d25ff..3480f932c8007791cea7d6002cdb8669aec1b3dc 100644 (file)
@@ -14,7 +14,7 @@ import (
 
 // Set keep alive period.
 func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index afd80644a180b06b98c07cdbf4bced533a4b30a5..8b41b2117dcdba3bb257fc3295210214702900c8 100644 (file)
@@ -12,7 +12,7 @@ import (
 )
 
 func setNoDelay(fd *netFD, noDelay bool) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index dfc0452d2941fdc574b1dfe28000ab81d860ac4d..fba2acdb601f5022ca5c22eef7bc1ca58e9e72eb 100644 (file)
@@ -14,7 +14,7 @@ import (
 
 // Set keep alive period.
 func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index 538366d909a216aa75e3a1a1cd376b953ac52e83..0bf4312f248b04befe5acf13ed2784fec466fff0 100644 (file)
@@ -11,7 +11,7 @@ import (
 )
 
 func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
-       if err := fd.incref(false); err != nil {
+       if err := fd.incref(); err != nil {
                return err
        }
        defer fd.decref()
index abf5df10c1115fc5cd2f33357aa9212728465a58..3c7df994751645b9a6721ca59a7d311fa08278a2 100644 (file)
@@ -2019,7 +2019,7 @@ runtime·gc(int32 force)
        if(gcpercent < 0)
                return;
 
-       runtime·semacquire(&runtime·worldsema);
+       runtime·semacquire(&runtime·worldsema, false);
        if(!force && mstats.heap_alloc < mstats.next_gc) {
                // typically threads which lost the race to grab
                // worldsema exit here when gc is done.
@@ -2218,7 +2218,7 @@ runtime·ReadMemStats(MStats *stats)
        // because stoptheworld can only be used by
        // one goroutine at a time, and there might be
        // a pending garbage collection already calling it.
-       runtime·semacquire(&runtime·worldsema);
+       runtime·semacquire(&runtime·worldsema, false);
        m->gcing = 1;
        runtime·stoptheworld();
        updatememstats(nil);
index 0d89a267b9e98abfb26757afdda290f95a8f23ec..6e51ef3eb161050dd0e8289a1b834453e268c4bf 100644 (file)
@@ -447,7 +447,7 @@ func Stack(b Slice, all bool) (n int) {
        pc = (uintptr)runtime·getcallerpc(&b);
 
        if(all) {
-               runtime·semacquire(&runtime·worldsema);
+               runtime·semacquire(&runtime·worldsema, false);
                m->gcing = 1;
                runtime·stoptheworld();
        }
@@ -494,7 +494,7 @@ func GoroutineProfile(b Slice) (n int, ok bool) {
        ok = false;
        n = runtime·gcount();
        if(n <= b.len) {
-               runtime·semacquire(&runtime·worldsema);
+               runtime·semacquire(&runtime·worldsema, false);
                m->gcing = 1;
                runtime·stoptheworld();
 
index ebe6defa00b9761ea3b597d204b925aed10bc0c3..ec6a4113fbcdab84b7e9b0fb1b363db4c42191ce 100644 (file)
@@ -206,6 +206,14 @@ func runtime_pollUnblock(pd *PollDesc) {
                runtime·ready(wg);
 }
 
+func runtime_Semacquire(addr *uint32) {
+       runtime·semacquire(addr, true);
+}
+
+func runtime_Semrelease(addr *uint32) {
+       runtime·semrelease(addr);
+}
+
 uintptr
 runtime·netpollfd(PollDesc *pd)
 {
index 1c39807e00d33d3ee76b9b29b2498fa0ac93d200..95b39b6d5e9480ad19e2fdeb261243daecae6531 100644 (file)
@@ -1836,7 +1836,7 @@ runtime·gomaxprocsfunc(int32 n)
        }
        runtime·unlock(&runtime·sched);
 
-       runtime·semacquire(&runtime·worldsema);
+       runtime·semacquire(&runtime·worldsema, false);
        m->gcing = 1;
        runtime·stoptheworld();
        newprocs = n;
index 557da6f8e357759b723669f7af2b92666e982cd9..875375da2858610a7da239005d0bf53fde800ee6 100644 (file)
@@ -326,7 +326,7 @@ runtime·RaceReleaseMerge(void *addr)
 void
 runtime·RaceSemacquire(uint32 *s)
 {
-       runtime·semacquire(s);
+       runtime·semacquire(s, false);
 }
 
 // func RaceSemrelease(s *uint32)
index 7d04a75424abc97404d8144608ed7b7c8948f43a..a3edb5e9551aabce7bc6d939f9a40c9b57607d0b 100644 (file)
@@ -1021,7 +1021,7 @@ bool      runtime·isInf(float64 f, int32 sign);
 bool   runtime·isNaN(float64 f);
 float64        runtime·ldexp(float64 d, int32 e);
 float64        runtime·modf(float64 d, float64 *ip);
-void   runtime·semacquire(uint32*);
+void   runtime·semacquire(uint32*, bool);
 void   runtime·semrelease(uint32*);
 int32  runtime·gomaxprocsfunc(int32 n);
 void   runtime·procyield(uint32);
index 4df01fc4e497b6531247db1ed61224cb6234190f..05222e2df72159ea45664b36b404741ab8b2aee8 100644 (file)
@@ -98,8 +98,8 @@ cansemacquire(uint32 *addr)
        return 0;
 }
 
-static void
-semacquireimpl(uint32 volatile *addr, int32 profile)
+void
+runtime·semacquire(uint32 volatile *addr, bool profile)
 {
        Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
        SemaRoot *root;
@@ -144,12 +144,6 @@ semacquireimpl(uint32 volatile *addr, int32 profile)
        }
 }
 
-void
-runtime·semacquire(uint32 volatile *addr)
-{
-       semacquireimpl(addr, 0);
-}
-
 void
 runtime·semrelease(uint32 volatile *addr)
 {
@@ -189,7 +183,7 @@ runtime·semrelease(uint32 volatile *addr)
 }
 
 func runtime_Semacquire(addr *uint32) {
-       semacquireimpl(addr, 1);
+       runtime·semacquire(addr, true);
 }
 
 func runtime_Semrelease(addr *uint32) {