]> Cypherpunks repositories - gostls13.git/commitdiff
net: reduce number of memory allocations during IO operations
authorDmitriy Vyukov <dvyukov@google.com>
Tue, 6 Aug 2013 10:40:10 +0000 (14:40 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Tue, 6 Aug 2013 10:40:10 +0000 (14:40 +0400)
Embed all data necessary for read/write operations directly into netFD.

benchmark                    old ns/op    new ns/op    delta
BenchmarkTCP4Persistent          27669        23341  -15.64%
BenchmarkTCP4Persistent-2        18173        12558  -30.90%
BenchmarkTCP4Persistent-4        10390         7319  -29.56%

This change will intentionally break all builders to see
how many allocations they do per read/write.
This will be fixed soon afterwards.

R=golang-dev, alex.brainman
CC=golang-dev
https://golang.org/cl/12413043

src/pkg/net/fd_windows.go
src/pkg/net/sendfile_windows.go
src/pkg/net/tcp_test.go
src/pkg/runtime/netpoll_windows.c

index f51d1616e0154234c93550be0eb72761a8cc3f30..9ed99edb4c9770d3d8fee96eae40a32fdb706fb0 100644 (file)
@@ -67,16 +67,8 @@ func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn,
        return dial(net, addr, localAddr, ra, deadline)
 }
 
-// Interface for all IO operations.
-type anOpIface interface {
-       Op() *anOp
-       Name() string
-       Submit() error
-}
-
-// anOp implements functionality common to all IO operations.
-// Its beginning must be the same as runtime.net_anOp. Keep these in sync.
-type anOp struct {
+// operation contains superset of data necessary to perform all async IO.
+type operation struct {
        // Used by IOCP interface, it must be first field
        // of the struct, as our code rely on it.
        o syscall.Overlapped
@@ -87,53 +79,34 @@ type anOp struct {
        errno      int32
        qty        uint32
 
-       errnoc chan error
+       // fields used only by net package
+       mu     sync.Mutex
        fd     *netFD
+       errc   chan error
+       buf    syscall.WSABuf
+       sa     syscall.Sockaddr
+       rsa    *syscall.RawSockaddrAny
+       rsan   int32
+       handle syscall.Handle
+       flags  uint32
 }
 
-func (o *anOp) Init(fd *netFD, mode int32) {
-       o.fd = fd
-       o.mode = mode
-       o.runtimeCtx = fd.pd.runtimeCtx
-       if !canCancelIO {
-               var i int
-               if mode == 'r' {
-                       i = 0
-               } else {
-                       i = 1
-               }
-               if fd.errnoc[i] == nil {
-                       fd.errnoc[i] = make(chan error)
-               }
-               o.errnoc = fd.errnoc[i]
-       }
-}
-
-func (o *anOp) Op() *anOp {
-       return o
-}
-
-// bufOp is used by IO operations that read / write
-// data from / to client buffer.
-type bufOp struct {
-       anOp
-       buf syscall.WSABuf
-}
-
-func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) {
-       o.anOp.Init(fd, mode)
+func (o *operation) InitBuf(buf []byte) {
        o.buf.Len = uint32(len(buf))
-       if len(buf) == 0 {
-               o.buf.Buf = nil
-       } else {
+       o.buf.Buf = nil
+       if len(buf) != 0 {
                o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
        }
 }
 
 // ioSrv executes net IO requests.
 type ioSrv struct {
-       submchan chan anOpIface // submit IO requests
-       canchan  chan anOpIface // cancel IO requests
+       req chan ioSrvReq
+}
+
+type ioSrvReq struct {
+       o      *operation
+       submit func(o *operation) error // if nil, cancel the operation
 }
 
 // ProcessRemoteIO will execute submit IO requests on behalf
@@ -144,36 +117,34 @@ type ioSrv struct {
 func (s *ioSrv) ProcessRemoteIO() {
        runtime.LockOSThread()
        defer runtime.UnlockOSThread()
-       for {
-               select {
-               case o := <-s.submchan:
-                       o.Op().errnoc <- o.Submit()
-               case o := <-s.canchan:
-                       o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
+       for r := range s.req {
+               if r.submit != nil {
+                       r.o.errc <- r.submit(r.o)
+               } else {
+                       r.o.errc <- syscall.CancelIo(r.o.fd.sysfd)
                }
        }
 }
 
-// ExecIO executes a single IO operation oi. It submits and cancels
+// ExecIO executes a single IO operation o. It submits and cancels
 // IO in the current thread for systems where Windows CancelIoEx API
 // is available. Alternatively, it passes the request onto
 // runtime netpoll and waits for completion or cancels request.
-func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
-       var err error
-       o := oi.Op()
+func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) {
+       fd := o.fd
        // Notify runtime netpoll about starting IO.
-       err = o.fd.pd.Prepare(int(o.mode))
+       err := fd.pd.Prepare(int(o.mode))
        if err != nil {
-               return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+               return 0, &OpError{name, fd.net, fd.laddr, err}
        }
        // Start IO.
        if canCancelIO {
-               err = oi.Submit()
+               err = submit(o)
        } else {
                // Send request to a special dedicated thread,
                // so it can stop the IO with CancelIO later.
-               s.submchan <- oi
-               err = <-o.errnoc
+               s.req <- ioSrvReq{o, submit}
+               err = <-o.errc
        }
        switch err {
        case nil:
@@ -182,15 +153,15 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
                // IO started, and we have to wait for its completion.
                err = nil
        default:
-               return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+               return 0, &OpError{name, fd.net, fd.laddr, err}
        }
        // Wait for our request to complete.
-       err = o.fd.pd.Wait(int(o.mode))
+       err = fd.pd.Wait(int(o.mode))
        if err == nil {
                // All is good. Extract our IO results and return.
                if o.errno != 0 {
                        err = syscall.Errno(o.errno)
-                       return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+                       return 0, &OpError{name, fd.net, fd.laddr, err}
                }
                return int(o.qty), nil
        }
@@ -204,24 +175,24 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
        }
        // Cancel our request.
        if canCancelIO {
-               err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o)
+               err := syscall.CancelIoEx(fd.sysfd, &o.o)
                // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
                if err != nil && err != syscall.ERROR_NOT_FOUND {
                        // TODO(brainman): maybe do something else, but panic.
                        panic(err)
                }
        } else {
-               s.canchan <- oi
-               <-o.errnoc
+               s.req <- ioSrvReq{o, nil}
+               <-o.errc
        }
        // Wait for cancellation to complete.
-       o.fd.pd.WaitCanceled(int(o.mode))
+       fd.pd.WaitCanceled(int(o.mode))
        if o.errno != 0 {
                err = syscall.Errno(o.errno)
                if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
                        err = netpollErr
                }
-               return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+               return 0, &OpError{name, fd.net, fd.laddr, err}
        }
        // We issued cancellation request. But, it seems, IO operation succeeded
        // before cancellation request run. We need to treat IO operation as
@@ -238,8 +209,7 @@ func startServer() {
        if !canCancelIO {
                // Only CancelIo API is available. Lets start special goroutine
                // locked to an OS thread, that both starts and cancels IO.
-               iosrv.submchan = make(chan anOpIface)
-               iosrv.canchan = make(chan anOpIface)
+               iosrv.req = make(chan ioSrvReq)
                go iosrv.ProcessRemoteIO()
        }
 }
@@ -259,30 +229,39 @@ type netFD struct {
        net         string
        laddr       Addr
        raddr       Addr
-       errnoc      [2]chan error // read/write submit or cancel operation errors
 
-       // serialize access to Read and Write methods
-       rio, wio sync.Mutex
+       rop operation // read operation
+       wop operation // write operation
 
        // wait server
        pd pollDesc
 }
 
-func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) {
+func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
        if initErr != nil {
                return nil, initErr
        }
        onceStartServer.Do(startServer)
-       netfd := &netFD{
-               sysfd:  fd,
+       fd := &netFD{
+               sysfd:  sysfd,
                family: family,
                sotype: sotype,
                net:    net,
        }
-       if err := netfd.pd.Init(netfd); err != nil {
+       if err := fd.pd.Init(fd); err != nil {
                return nil, err
        }
-       return netfd, nil
+       fd.rop.mode = 'r'
+       fd.wop.mode = 'w'
+       fd.rop.fd = fd
+       fd.wop.fd = fd
+       fd.rop.runtimeCtx = fd.pd.runtimeCtx
+       fd.wop.runtimeCtx = fd.pd.runtimeCtx
+       if !canCancelIO {
+               fd.rop.errc = make(chan error)
+               fd.rop.errc = make(chan error)
+       }
+       return fd, nil
 }
 
 func (fd *netFD) setAddr(laddr, raddr Addr) {
@@ -291,21 +270,6 @@ func (fd *netFD) setAddr(laddr, raddr Addr) {
        runtime.SetFinalizer(fd, (*netFD).Close)
 }
 
-// Make new connection.
-
-type connectOp struct {
-       anOp
-       ra syscall.Sockaddr
-}
-
-func (o *connectOp) Submit() error {
-       return syscall.ConnectEx(o.fd.sysfd, o.ra, nil, 0, nil, &o.o)
-}
-
-func (o *connectOp) Name() string {
-       return "ConnectEx"
-}
-
 func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
        if !canUseConnectEx(fd.net) {
                return syscall.Connect(fd.sysfd, ra)
@@ -325,10 +289,13 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
                }
        }
        // Call ConnectEx API.
-       var o connectOp
-       o.Init(fd, 'w')
-       o.ra = ra
-       _, err := iosrv.ExecIO(&o)
+       o := &fd.wop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.sa = ra
+       _, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
+               return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
+       })
        if err != nil {
                return err
        }
@@ -385,10 +352,10 @@ func (fd *netFD) Close() error {
        // unblock pending reader and writer
        fd.pd.Evict()
        // wait for both reader and writer to exit
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
+       fd.rop.mu.Lock()
+       fd.wop.mu.Lock()
+       fd.rop.mu.Unlock()
+       fd.wop.mu.Unlock()
        return nil
 }
 
@@ -412,54 +379,24 @@ func (fd *netFD) CloseWrite() error {
        return fd.shutdown(syscall.SHUT_WR)
 }
 
-// Read from network.
-
-type readOp struct {
-       bufOp
-}
-
-func (o *readOp) Submit() error {
-       var d, f uint32
-       return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
-}
-
-func (o *readOp) Name() string {
-       return "WSARecv"
-}
-
 func (fd *netFD) Read(buf []byte) (int, error) {
        if err := fd.incref(false); err != nil {
                return 0, err
        }
        defer fd.decref()
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       var o readOp
-       o.Init(fd, buf, 'r')
-       n, err := iosrv.ExecIO(&o)
+       o := &fd.rop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.InitBuf(buf)
+       n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
+               return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
+       })
        if err == nil && n == 0 {
                err = io.EOF
        }
        return n, err
 }
 
-// ReadFrom from network.
-
-type readFromOp struct {
-       bufOp
-       rsa  syscall.RawSockaddrAny
-       rsan int32
-}
-
-func (o *readFromOp) Submit() error {
-       var d, f uint32
-       return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
-}
-
-func (o *readFromOp) Name() string {
-       return "WSARecvFrom"
-}
-
 func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
        if len(buf) == 0 {
                return 0, nil, nil
@@ -468,12 +405,17 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
                return 0, nil, err
        }
        defer fd.decref()
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       var o readFromOp
-       o.Init(fd, buf, 'r')
-       o.rsan = int32(unsafe.Sizeof(o.rsa))
-       n, err = iosrv.ExecIO(&o)
+       o := &fd.rop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.InitBuf(buf)
+       n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
+               if o.rsa == nil {
+                       o.rsa = new(syscall.RawSockaddrAny)
+               }
+               o.rsan = int32(unsafe.Sizeof(*o.rsa))
+               return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil)
+       })
        if err != nil {
                return 0, nil, err
        }
@@ -481,47 +423,18 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
        return
 }
 
-// Write to network.
-
-type writeOp struct {
-       bufOp
-}
-
-func (o *writeOp) Submit() error {
-       var d uint32
-       return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
-}
-
-func (o *writeOp) Name() string {
-       return "WSASend"
-}
-
 func (fd *netFD) Write(buf []byte) (int, error) {
        if err := fd.incref(false); err != nil {
                return 0, err
        }
        defer fd.decref()
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
-       var o writeOp
-       o.Init(fd, buf, 'w')
-       return iosrv.ExecIO(&o)
-}
-
-// WriteTo to network.
-
-type writeToOp struct {
-       bufOp
-       sa syscall.Sockaddr
-}
-
-func (o *writeToOp) Submit() error {
-       var d uint32
-       return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
-}
-
-func (o *writeToOp) Name() string {
-       return "WSASendto"
+       o := &fd.wop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.InitBuf(buf)
+       return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
+               return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
+       })
 }
 
 func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
@@ -532,31 +445,14 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
                return 0, err
        }
        defer fd.decref()
-       fd.wio.Lock()
-       defer fd.wio.Unlock()
-       var o writeToOp
-       o.Init(fd, buf, 'w')
+       o := &fd.wop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.InitBuf(buf)
        o.sa = sa
-       return iosrv.ExecIO(&o)
-}
-
-// Accept new network connections.
-
-type acceptOp struct {
-       anOp
-       newsock syscall.Handle
-       attrs   [2]syscall.RawSockaddrAny // space for local and remote address only
-}
-
-func (o *acceptOp) Submit() error {
-       var d uint32
-       l := uint32(unsafe.Sizeof(o.attrs[0]))
-       return syscall.AcceptEx(o.fd.sysfd, o.newsock,
-               (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
-}
-
-func (o *acceptOp) Name() string {
-       return "AcceptEx"
+       return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
+               return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
+       })
 }
 
 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
@@ -579,12 +475,15 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
        }
 
        // Submit accept request.
-       fd.rio.Lock()
-       defer fd.rio.Unlock()
-       var o acceptOp
-       o.Init(fd, 'r')
-       o.newsock = s
-       _, err = iosrv.ExecIO(&o)
+       o := &fd.rop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.handle = s
+       var rawsa [2]syscall.RawSockaddrAny
+       o.rsan = int32(unsafe.Sizeof(rawsa[0]))
+       _, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error {
+               return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
+       })
        if err != nil {
                netfd.Close()
                return nil, err
@@ -600,9 +499,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
        // Get local and peer addr out of AcceptEx buffer.
        var lrsa, rrsa *syscall.RawSockaddrAny
        var llen, rlen int32
-       l := uint32(unsafe.Sizeof(*lrsa))
-       syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
-               0, l, l, &lrsa, &llen, &rrsa, &rlen)
+       syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])),
+               0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen)
        lsa, _ := lrsa.Sockaddr()
        rsa, _ := rrsa.Sockaddr()
 
index 5012583b2cfbc62268dba2a6b3092c304ac9ecf0..e9b9f91da54aaea646a214528d0835b9bd5e4c8e 100644 (file)
@@ -10,20 +10,6 @@ import (
        "syscall"
 )
 
-type sendfileOp struct {
-       anOp
-       src syscall.Handle // source
-       n   uint32
-}
-
-func (o *sendfileOp) Submit() (err error) {
-       return syscall.TransmitFile(o.fd.sysfd, o.src, o.n, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
-}
-
-func (o *sendfileOp) Name() string {
-       return "TransmitFile"
-}
-
 // sendFile copies the contents of r to c using the TransmitFile
 // system call to minimize copies.
 //
@@ -33,7 +19,7 @@ func (o *sendfileOp) Name() string {
 // if handled == false, sendFile performed no work.
 //
 // Note that sendfile for windows does not suppport >2GB file.
-func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
+func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
        var n int64 = 0 // by default, copy until EOF
 
        lr, ok := r.(*io.LimitedReader)
@@ -48,18 +34,18 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
                return 0, nil, false
        }
 
-       if err := c.incref(false); err != nil {
+       if err := fd.incref(false); err != nil {
                return 0, err, true
        }
-       defer c.decref()
-       c.wio.Lock()
-       defer c.wio.Unlock()
-
-       var o sendfileOp
-       o.Init(c, 'w')
-       o.n = uint32(n)
-       o.src = syscall.Handle(f.Fd())
-       done, err := iosrv.ExecIO(&o)
+       defer fd.decref()
+       o := &fd.wop
+       o.mu.Lock()
+       defer o.mu.Unlock()
+       o.qty = uint32(n)
+       o.handle = syscall.Handle(f.Fd())
+       done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
+               return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
+       })
        if err != nil {
                return 0, err, false
        }
index f356f92f0967afd6ec1855dbc0fc8bfa7747f85f..dedd41df9489774064a54fbbbc746e1bd6e079c1 100644 (file)
@@ -6,6 +6,7 @@ package net
 
 import (
        "fmt"
+       "io"
        "reflect"
        "runtime"
        "sync"
@@ -327,3 +328,46 @@ func TestTCPConcurrentAccept(t *testing.T) {
        ln.Close()
        wg.Wait()
 }
+
+func TestTCPReadWriteMallocs(t *testing.T) {
+       maxMallocs := 0
+       switch runtime.GOOS {
+       // Add other OSes if you know how many mallocs they do.
+       case "windows":
+               maxMallocs = 0
+       }
+       ln, err := Listen("tcp", "127.0.0.1:0")
+       if err != nil {
+               t.Fatalf("Listen failed: %v", err)
+       }
+       defer ln.Close()
+       var server Conn
+       errc := make(chan error)
+       go func() {
+               var err error
+               server, err = ln.Accept()
+               errc <- err
+       }()
+       client, err := Dial("tcp", ln.Addr().String())
+       if err != nil {
+               t.Fatalf("Dial failed: %v", err)
+       }
+       if err := <-errc; err != nil {
+               t.Fatalf("Accept failed: %v", err)
+       }
+       defer server.Close()
+       var buf [128]byte
+       mallocs := testing.AllocsPerRun(1000, func() {
+               _, err := server.Write(buf[:])
+               if err != nil {
+                       t.Fatalf("Write failed: %v", err)
+               }
+               _, err = io.ReadFull(client, buf[:])
+               if err != nil {
+                       t.Fatalf("Read failed: %v", err)
+               }
+       })
+       if int(mallocs) > maxMallocs {
+               t.Fatalf("Got %v allocs, want %v", mallocs, maxMallocs)
+       }
+}
index e2153c540eab46574a64d3738bf03ac4b460c1c7..7a95380a334a06be67a6ced93946d2f3b988641f 100644 (file)
@@ -16,9 +16,9 @@ extern void *runtime·GetQueuedCompletionStatus;
 
 #define INVALID_HANDLE_VALUE ((uintptr)-1)
 
-// net_anOp must be the same as beginning of net.anOp. Keep these in sync.
-typedef struct net_anOp net_anOp;
-struct net_anOp
+// net_op must be the same as beginning of net.operation. Keep these in sync.
+typedef struct net_op net_op;
+struct net_op
 {
        // used by windows
        Overlapped      o;
@@ -66,7 +66,7 @@ runtime·netpoll(bool block)
 {
        uint32 wait, qty, key;
        int32 mode, errno;
-       net_anOp *o;
+       net_op *o;
        G *gp;
 
        if(iocphandle == INVALID_HANDLE_VALUE)