From: Dmitriy Vyukov Date: Tue, 6 Aug 2013 10:40:10 +0000 (+0400) Subject: net: reduce number of memory allocations during IO operations X-Git-Tag: go1.2rc2~768 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=04b1cfa94635f18462b8a076cebacc5e08d92631;p=gostls13.git net: reduce number of memory allocations during IO operations Embed all data necessary for read/write operations directly into netFD. benchmark old ns/op new ns/op delta BenchmarkTCP4Persistent 27669 23341 -15.64% BenchmarkTCP4Persistent-2 18173 12558 -30.90% BenchmarkTCP4Persistent-4 10390 7319 -29.56% This change will intentionally break all builders to see how many allocations they do per read/write. This will be fixed soon afterwards. R=golang-dev, alex.brainman CC=golang-dev https://golang.org/cl/12413043 --- diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go index f51d1616e0..9ed99edb4c 100644 --- a/src/pkg/net/fd_windows.go +++ b/src/pkg/net/fd_windows.go @@ -67,16 +67,8 @@ func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, return dial(net, addr, localAddr, ra, deadline) } -// Interface for all IO operations. -type anOpIface interface { - Op() *anOp - Name() string - Submit() error -} - -// anOp implements functionality common to all IO operations. -// Its beginning must be the same as runtime.net_anOp. Keep these in sync. -type anOp struct { +// operation contains superset of data necessary to perform all async IO. +type operation struct { // Used by IOCP interface, it must be first field // of the struct, as our code rely on it. o syscall.Overlapped @@ -87,53 +79,34 @@ type anOp struct { errno int32 qty uint32 - errnoc chan error + // fields used only by net package + mu sync.Mutex fd *netFD + errc chan error + buf syscall.WSABuf + sa syscall.Sockaddr + rsa *syscall.RawSockaddrAny + rsan int32 + handle syscall.Handle + flags uint32 } -func (o *anOp) Init(fd *netFD, mode int32) { - o.fd = fd - o.mode = mode - o.runtimeCtx = fd.pd.runtimeCtx - if !canCancelIO { - var i int - if mode == 'r' { - i = 0 - } else { - i = 1 - } - if fd.errnoc[i] == nil { - fd.errnoc[i] = make(chan error) - } - o.errnoc = fd.errnoc[i] - } -} - -func (o *anOp) Op() *anOp { - return o -} - -// bufOp is used by IO operations that read / write -// data from / to client buffer. -type bufOp struct { - anOp - buf syscall.WSABuf -} - -func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) { - o.anOp.Init(fd, mode) +func (o *operation) InitBuf(buf []byte) { o.buf.Len = uint32(len(buf)) - if len(buf) == 0 { - o.buf.Buf = nil - } else { + o.buf.Buf = nil + if len(buf) != 0 { o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0])) } } // ioSrv executes net IO requests. type ioSrv struct { - submchan chan anOpIface // submit IO requests - canchan chan anOpIface // cancel IO requests + req chan ioSrvReq +} + +type ioSrvReq struct { + o *operation + submit func(o *operation) error // if nil, cancel the operation } // ProcessRemoteIO will execute submit IO requests on behalf @@ -144,36 +117,34 @@ type ioSrv struct { func (s *ioSrv) ProcessRemoteIO() { runtime.LockOSThread() defer runtime.UnlockOSThread() - for { - select { - case o := <-s.submchan: - o.Op().errnoc <- o.Submit() - case o := <-s.canchan: - o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd)) + for r := range s.req { + if r.submit != nil { + r.o.errc <- r.submit(r.o) + } else { + r.o.errc <- syscall.CancelIo(r.o.fd.sysfd) } } } -// ExecIO executes a single IO operation oi. It submits and cancels +// ExecIO executes a single IO operation o. It submits and cancels // IO in the current thread for systems where Windows CancelIoEx API // is available. Alternatively, it passes the request onto // runtime netpoll and waits for completion or cancels request. -func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { - var err error - o := oi.Op() +func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) { + fd := o.fd // Notify runtime netpoll about starting IO. - err = o.fd.pd.Prepare(int(o.mode)) + err := fd.pd.Prepare(int(o.mode)) if err != nil { - return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} + return 0, &OpError{name, fd.net, fd.laddr, err} } // Start IO. if canCancelIO { - err = oi.Submit() + err = submit(o) } else { // Send request to a special dedicated thread, // so it can stop the IO with CancelIO later. - s.submchan <- oi - err = <-o.errnoc + s.req <- ioSrvReq{o, submit} + err = <-o.errc } switch err { case nil: @@ -182,15 +153,15 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { // IO started, and we have to wait for its completion. err = nil default: - return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} + return 0, &OpError{name, fd.net, fd.laddr, err} } // Wait for our request to complete. - err = o.fd.pd.Wait(int(o.mode)) + err = fd.pd.Wait(int(o.mode)) if err == nil { // All is good. Extract our IO results and return. if o.errno != 0 { err = syscall.Errno(o.errno) - return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} + return 0, &OpError{name, fd.net, fd.laddr, err} } return int(o.qty), nil } @@ -204,24 +175,24 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { } // Cancel our request. if canCancelIO { - err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o) + err := syscall.CancelIoEx(fd.sysfd, &o.o) // Assuming ERROR_NOT_FOUND is returned, if IO is completed. if err != nil && err != syscall.ERROR_NOT_FOUND { // TODO(brainman): maybe do something else, but panic. panic(err) } } else { - s.canchan <- oi - <-o.errnoc + s.req <- ioSrvReq{o, nil} + <-o.errc } // Wait for cancellation to complete. - o.fd.pd.WaitCanceled(int(o.mode)) + fd.pd.WaitCanceled(int(o.mode)) if o.errno != 0 { err = syscall.Errno(o.errno) if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled err = netpollErr } - return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} + return 0, &OpError{name, fd.net, fd.laddr, err} } // We issued cancellation request. But, it seems, IO operation succeeded // before cancellation request run. We need to treat IO operation as @@ -238,8 +209,7 @@ func startServer() { if !canCancelIO { // Only CancelIo API is available. Lets start special goroutine // locked to an OS thread, that both starts and cancels IO. - iosrv.submchan = make(chan anOpIface) - iosrv.canchan = make(chan anOpIface) + iosrv.req = make(chan ioSrvReq) go iosrv.ProcessRemoteIO() } } @@ -259,30 +229,39 @@ type netFD struct { net string laddr Addr raddr Addr - errnoc [2]chan error // read/write submit or cancel operation errors - // serialize access to Read and Write methods - rio, wio sync.Mutex + rop operation // read operation + wop operation // write operation // wait server pd pollDesc } -func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) { +func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { if initErr != nil { return nil, initErr } onceStartServer.Do(startServer) - netfd := &netFD{ - sysfd: fd, + fd := &netFD{ + sysfd: sysfd, family: family, sotype: sotype, net: net, } - if err := netfd.pd.Init(netfd); err != nil { + if err := fd.pd.Init(fd); err != nil { return nil, err } - return netfd, nil + fd.rop.mode = 'r' + fd.wop.mode = 'w' + fd.rop.fd = fd + fd.wop.fd = fd + fd.rop.runtimeCtx = fd.pd.runtimeCtx + fd.wop.runtimeCtx = fd.pd.runtimeCtx + if !canCancelIO { + fd.rop.errc = make(chan error) + fd.rop.errc = make(chan error) + } + return fd, nil } func (fd *netFD) setAddr(laddr, raddr Addr) { @@ -291,21 +270,6 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { runtime.SetFinalizer(fd, (*netFD).Close) } -// Make new connection. - -type connectOp struct { - anOp - ra syscall.Sockaddr -} - -func (o *connectOp) Submit() error { - return syscall.ConnectEx(o.fd.sysfd, o.ra, nil, 0, nil, &o.o) -} - -func (o *connectOp) Name() string { - return "ConnectEx" -} - func (fd *netFD) connect(la, ra syscall.Sockaddr) error { if !canUseConnectEx(fd.net) { return syscall.Connect(fd.sysfd, ra) @@ -325,10 +289,13 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { } } // Call ConnectEx API. - var o connectOp - o.Init(fd, 'w') - o.ra = ra - _, err := iosrv.ExecIO(&o) + o := &fd.wop + o.mu.Lock() + defer o.mu.Unlock() + o.sa = ra + _, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error { + return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) + }) if err != nil { return err } @@ -385,10 +352,10 @@ func (fd *netFD) Close() error { // unblock pending reader and writer fd.pd.Evict() // wait for both reader and writer to exit - fd.rio.Lock() - defer fd.rio.Unlock() - fd.wio.Lock() - defer fd.wio.Unlock() + fd.rop.mu.Lock() + fd.wop.mu.Lock() + fd.rop.mu.Unlock() + fd.wop.mu.Unlock() return nil } @@ -412,54 +379,24 @@ func (fd *netFD) CloseWrite() error { return fd.shutdown(syscall.SHUT_WR) } -// Read from network. - -type readOp struct { - bufOp -} - -func (o *readOp) Submit() error { - var d, f uint32 - return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil) -} - -func (o *readOp) Name() string { - return "WSARecv" -} - func (fd *netFD) Read(buf []byte) (int, error) { if err := fd.incref(false); err != nil { return 0, err } defer fd.decref() - fd.rio.Lock() - defer fd.rio.Unlock() - var o readOp - o.Init(fd, buf, 'r') - n, err := iosrv.ExecIO(&o) + o := &fd.rop + o.mu.Lock() + defer o.mu.Unlock() + o.InitBuf(buf) + n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error { + return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) + }) if err == nil && n == 0 { err = io.EOF } return n, err } -// ReadFrom from network. - -type readFromOp struct { - bufOp - rsa syscall.RawSockaddrAny - rsan int32 -} - -func (o *readFromOp) Submit() error { - var d, f uint32 - return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil) -} - -func (o *readFromOp) Name() string { - return "WSARecvFrom" -} - func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { if len(buf) == 0 { return 0, nil, nil @@ -468,12 +405,17 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { return 0, nil, err } defer fd.decref() - fd.rio.Lock() - defer fd.rio.Unlock() - var o readFromOp - o.Init(fd, buf, 'r') - o.rsan = int32(unsafe.Sizeof(o.rsa)) - n, err = iosrv.ExecIO(&o) + o := &fd.rop + o.mu.Lock() + defer o.mu.Unlock() + o.InitBuf(buf) + n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.rsan = int32(unsafe.Sizeof(*o.rsa)) + return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) + }) if err != nil { return 0, nil, err } @@ -481,47 +423,18 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { return } -// Write to network. - -type writeOp struct { - bufOp -} - -func (o *writeOp) Submit() error { - var d uint32 - return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil) -} - -func (o *writeOp) Name() string { - return "WSASend" -} - func (fd *netFD) Write(buf []byte) (int, error) { if err := fd.incref(false); err != nil { return 0, err } defer fd.decref() - fd.wio.Lock() - defer fd.wio.Unlock() - var o writeOp - o.Init(fd, buf, 'w') - return iosrv.ExecIO(&o) -} - -// WriteTo to network. - -type writeToOp struct { - bufOp - sa syscall.Sockaddr -} - -func (o *writeToOp) Submit() error { - var d uint32 - return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil) -} - -func (o *writeToOp) Name() string { - return "WSASendto" + o := &fd.wop + o.mu.Lock() + defer o.mu.Unlock() + o.InitBuf(buf) + return iosrv.ExecIO(o, "WSASend", func(o *operation) error { + return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) + }) } func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { @@ -532,31 +445,14 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { return 0, err } defer fd.decref() - fd.wio.Lock() - defer fd.wio.Unlock() - var o writeToOp - o.Init(fd, buf, 'w') + o := &fd.wop + o.mu.Lock() + defer o.mu.Unlock() + o.InitBuf(buf) o.sa = sa - return iosrv.ExecIO(&o) -} - -// Accept new network connections. - -type acceptOp struct { - anOp - newsock syscall.Handle - attrs [2]syscall.RawSockaddrAny // space for local and remote address only -} - -func (o *acceptOp) Submit() error { - var d uint32 - l := uint32(unsafe.Sizeof(o.attrs[0])) - return syscall.AcceptEx(o.fd.sysfd, o.newsock, - (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o) -} - -func (o *acceptOp) Name() string { - return "AcceptEx" + return iosrv.ExecIO(o, "WSASendto", func(o *operation) error { + return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) + }) } func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { @@ -579,12 +475,15 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { } // Submit accept request. - fd.rio.Lock() - defer fd.rio.Unlock() - var o acceptOp - o.Init(fd, 'r') - o.newsock = s - _, err = iosrv.ExecIO(&o) + o := &fd.rop + o.mu.Lock() + defer o.mu.Unlock() + o.handle = s + var rawsa [2]syscall.RawSockaddrAny + o.rsan = int32(unsafe.Sizeof(rawsa[0])) + _, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error { + return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) + }) if err != nil { netfd.Close() return nil, err @@ -600,9 +499,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { // Get local and peer addr out of AcceptEx buffer. var lrsa, rrsa *syscall.RawSockaddrAny var llen, rlen int32 - l := uint32(unsafe.Sizeof(*lrsa)) - syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])), - 0, l, l, &lrsa, &llen, &rrsa, &rlen) + syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])), + 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen) lsa, _ := lrsa.Sockaddr() rsa, _ := rrsa.Sockaddr() diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go index 5012583b2c..e9b9f91da5 100644 --- a/src/pkg/net/sendfile_windows.go +++ b/src/pkg/net/sendfile_windows.go @@ -10,20 +10,6 @@ import ( "syscall" ) -type sendfileOp struct { - anOp - src syscall.Handle // source - n uint32 -} - -func (o *sendfileOp) Submit() (err error) { - return syscall.TransmitFile(o.fd.sysfd, o.src, o.n, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) -} - -func (o *sendfileOp) Name() string { - return "TransmitFile" -} - // sendFile copies the contents of r to c using the TransmitFile // system call to minimize copies. // @@ -33,7 +19,7 @@ func (o *sendfileOp) Name() string { // if handled == false, sendFile performed no work. // // Note that sendfile for windows does not suppport >2GB file. -func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { +func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) { var n int64 = 0 // by default, copy until EOF lr, ok := r.(*io.LimitedReader) @@ -48,18 +34,18 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, false } - if err := c.incref(false); err != nil { + if err := fd.incref(false); err != nil { return 0, err, true } - defer c.decref() - c.wio.Lock() - defer c.wio.Unlock() - - var o sendfileOp - o.Init(c, 'w') - o.n = uint32(n) - o.src = syscall.Handle(f.Fd()) - done, err := iosrv.ExecIO(&o) + defer fd.decref() + o := &fd.wop + o.mu.Lock() + defer o.mu.Unlock() + o.qty = uint32(n) + o.handle = syscall.Handle(f.Fd()) + done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error { + return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) + }) if err != nil { return 0, err, false } diff --git a/src/pkg/net/tcp_test.go b/src/pkg/net/tcp_test.go index f356f92f09..dedd41df94 100644 --- a/src/pkg/net/tcp_test.go +++ b/src/pkg/net/tcp_test.go @@ -6,6 +6,7 @@ package net import ( "fmt" + "io" "reflect" "runtime" "sync" @@ -327,3 +328,46 @@ func TestTCPConcurrentAccept(t *testing.T) { ln.Close() wg.Wait() } + +func TestTCPReadWriteMallocs(t *testing.T) { + maxMallocs := 0 + switch runtime.GOOS { + // Add other OSes if you know how many mallocs they do. + case "windows": + maxMallocs = 0 + } + ln, err := Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Listen failed: %v", err) + } + defer ln.Close() + var server Conn + errc := make(chan error) + go func() { + var err error + server, err = ln.Accept() + errc <- err + }() + client, err := Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatalf("Dial failed: %v", err) + } + if err := <-errc; err != nil { + t.Fatalf("Accept failed: %v", err) + } + defer server.Close() + var buf [128]byte + mallocs := testing.AllocsPerRun(1000, func() { + _, err := server.Write(buf[:]) + if err != nil { + t.Fatalf("Write failed: %v", err) + } + _, err = io.ReadFull(client, buf[:]) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + }) + if int(mallocs) > maxMallocs { + t.Fatalf("Got %v allocs, want %v", mallocs, maxMallocs) + } +} diff --git a/src/pkg/runtime/netpoll_windows.c b/src/pkg/runtime/netpoll_windows.c index e2153c540e..7a95380a33 100644 --- a/src/pkg/runtime/netpoll_windows.c +++ b/src/pkg/runtime/netpoll_windows.c @@ -16,9 +16,9 @@ extern void *runtime·GetQueuedCompletionStatus; #define INVALID_HANDLE_VALUE ((uintptr)-1) -// net_anOp must be the same as beginning of net.anOp. Keep these in sync. -typedef struct net_anOp net_anOp; -struct net_anOp +// net_op must be the same as beginning of net.operation. Keep these in sync. +typedef struct net_op net_op; +struct net_op { // used by windows Overlapped o; @@ -66,7 +66,7 @@ runtime·netpoll(bool block) { uint32 wait, qty, key; int32 mode, errno; - net_anOp *o; + net_op *o; G *gp; if(iocphandle == INVALID_HANDLE_VALUE)