From: Alex Brainman Date: Tue, 27 Aug 2013 04:53:57 +0000 (+1000) Subject: net: have separate read and write processing threads on windows X-Git-Tag: go1.2rc2~421 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=11320fa500c7201065baf1958e237ce4a03b3030;p=gostls13.git net: have separate read and write processing threads on windows Fixes #4195 R=golang-dev, mikioh.mikioh CC=golang-dev https://golang.org/cl/12960046 --- diff --git a/src/pkg/net/fd_plan9.go b/src/pkg/net/fd_plan9.go index 84987c3a9e..0d9dc54408 100644 --- a/src/pkg/net/fd_plan9.go +++ b/src/pkg/net/fd_plan9.go @@ -18,8 +18,6 @@ type netFD struct { laddr, raddr Addr } -var canCancelIO = true // used for testing current package - func sysInit() { } diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go index 58cfd04f35..bdc2861d55 100644 --- a/src/pkg/net/fd_unix.go +++ b/src/pkg/net/fd_unix.go @@ -33,8 +33,6 @@ type netFD struct { pd pollDesc } -var canCancelIO = true // used for testing current package - func sysInit() { } diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go index 78c7565747..75f7a63caa 100644 --- a/src/pkg/net/fd_windows.go +++ b/src/pkg/net/fd_windows.go @@ -234,16 +234,20 @@ func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) erro } // Start helper goroutines. -var iosrv *ioSrv +var rsrv, wsrv *ioSrv var onceStartServer sync.Once func startServer() { - iosrv = new(ioSrv) + rsrv = new(ioSrv) + wsrv = new(ioSrv) if !canCancelIO { - // Only CancelIo API is available. Lets start special goroutine - // locked to an OS thread, that both starts and cancels IO. - iosrv.req = make(chan ioSrvReq) - go iosrv.ProcessRemoteIO() + // Only CancelIo API is available. Lets start two special goroutines + // locked to an OS thread, that both starts and cancels IO. One will + // process read requests, while other will do writes. + rsrv.req = make(chan ioSrvReq) + go rsrv.ProcessRemoteIO() + wsrv.req = make(chan ioSrvReq) + go wsrv.ProcessRemoteIO() } } @@ -337,7 +341,7 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { // Call ConnectEx API. o := &fd.wop o.sa = ra - _, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error { + _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error { return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) }) if err != nil { @@ -446,7 +450,7 @@ func (fd *netFD) Read(buf []byte) (int, error) { defer fd.readUnlock() o := &fd.rop o.InitBuf(buf) - n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error { + n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error { return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) }) if err == nil && n == 0 { @@ -468,7 +472,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { defer fd.readUnlock() o := &fd.rop o.InitBuf(buf) - n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { + n, err = rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { if o.rsa == nil { o.rsa = new(syscall.RawSockaddrAny) } @@ -492,7 +496,7 @@ func (fd *netFD) Write(buf []byte) (int, error) { } o := &fd.wop o.InitBuf(buf) - return iosrv.ExecIO(o, "WSASend", func(o *operation) error { + return wsrv.ExecIO(o, "WSASend", func(o *operation) error { return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) }) } @@ -508,7 +512,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { o := &fd.wop o.InitBuf(buf) o.sa = sa - return iosrv.ExecIO(o, "WSASendto", func(o *operation) error { + return wsrv.ExecIO(o, "WSASendto", func(o *operation) error { return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) }) } @@ -541,7 +545,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { o.handle = s var rawsa [2]syscall.RawSockaddrAny o.rsan = int32(unsafe.Sizeof(rawsa[0])) - _, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error { + _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error { return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) }) if err != nil { diff --git a/src/pkg/net/http/transport_test.go b/src/pkg/net/http/transport_test.go index df01a65667..e4df30a98d 100644 --- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -830,7 +830,7 @@ func TestTransportPersistConnLeakShortBody(t *testing.T) { } nhigh := runtime.NumGoroutine() tr.CloseIdleConnections() - time.Sleep(50 * time.Millisecond) + time.Sleep(400 * time.Millisecond) runtime.GC() nfinal := runtime.NumGoroutine() diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go index 0107f679b3..b128ba27b0 100644 --- a/src/pkg/net/sendfile_windows.go +++ b/src/pkg/net/sendfile_windows.go @@ -42,7 +42,7 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) { o := &fd.wop o.qty = uint32(n) o.handle = syscall.Handle(f.Fd()) - done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error { + done, err := wsrv.ExecIO(o, "TransmitFile", func(o *operation) error { return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) }) if err != nil { diff --git a/src/pkg/net/timeout_test.go b/src/pkg/net/timeout_test.go index 350ec8f7b1..a14a88169b 100644 --- a/src/pkg/net/timeout_test.go +++ b/src/pkg/net/timeout_test.go @@ -325,9 +325,6 @@ func TestReadWriteDeadline(t *testing.T) { t.Skipf("skipping test on %q", runtime.GOOS) } - if !canCancelIO { - t.Skip("skipping test on this system") - } const ( readTimeout = 50 * time.Millisecond writeTimeout = 250 * time.Millisecond