"internal/race"
"internal/syscall/windows"
"io"
- "runtime"
"sync"
"syscall"
"unicode/utf16"
ioSync uint64
)
-// CancelIo Windows API cancels all outstanding IO for a particular
-// socket on current thread. To overcome that limitation, we run
-// special goroutine, locked to OS single thread, that both starts
-// and cancels IO. It means, there are 2 unavoidable thread switches
-// for every IO.
-// Some newer versions of Windows has new CancelIoEx API, that does
-// not have that limitation and can be used from any thread. This
-// package uses CancelIoEx API, if present, otherwise it fallback
-// to CancelIo.
-
-var canCancelIO bool // determines if CancelIoEx API is present
-
// This package uses the SetFileCompletionNotificationModes Windows
// API to skip calling GetQueuedCompletionStatus if an IO operation
// completes synchronously. There is a known bug where
if e != nil {
initErr = e
}
- canCancelIO = syscall.LoadCancelIoEx() == nil
checkSetFileCompletionNotificationModes()
}
// fields used only by net package
fd *FD
- errc chan error
buf syscall.WSABuf
msg windows.WSAMsg
sa syscall.Sockaddr
}
}
-// ioSrv executes net IO requests.
-type ioSrv struct {
- req chan ioSrvReq
-}
-
-type ioSrvReq struct {
- o *operation
- submit func(o *operation) error // if nil, cancel the operation
-}
-
-// ProcessRemoteIO will execute submit IO requests on behalf
-// of other goroutines, all on a single os thread, so it can
-// cancel them later. Results of all operations will be sent
-// back to their requesters via channel supplied in request.
-// It is used only when the CancelIoEx API is unavailable.
-func (s *ioSrv) ProcessRemoteIO() {
- runtime.LockOSThread()
- defer runtime.UnlockOSThread()
- for r := range s.req {
- if r.submit != nil {
- r.o.errc <- r.submit(r.o)
- } else {
- r.o.errc <- syscall.CancelIo(r.o.fd.Sysfd)
- }
- }
-}
-
-// ExecIO executes a single IO operation o. It submits and cancels
+// execIO executes a single IO operation o. It submits and cancels
// IO in the current thread for systems where Windows CancelIoEx API
// is available. Alternatively, it passes the request onto
// runtime netpoll and waits for completion or cancels request.
-func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, error) {
+func execIO(o *operation, submit func(o *operation) error) (int, error) {
if o.fd.pd.runtimeCtx == 0 {
return 0, errors.New("internal error: polling on unsupported descriptor type")
}
- if !canCancelIO {
- onceStartServer.Do(startServer)
- }
-
fd := o.fd
// Notify runtime netpoll about starting IO.
err := fd.pd.prepare(int(o.mode), fd.isFile)
return 0, err
}
// Start IO.
- if canCancelIO {
- err = submit(o)
- } else {
- // Send request to a special dedicated thread,
- // so it can stop the IO with CancelIO later.
- s.req <- ioSrvReq{o, submit}
- err = <-o.errc
- }
+ err = submit(o)
switch err {
case nil:
// IO completed immediately
panic("unexpected runtime.netpoll error: " + netpollErr.Error())
}
// Cancel our request.
- if canCancelIO {
- err := syscall.CancelIoEx(fd.Sysfd, &o.o)
- // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
- if err != nil && err != syscall.ERROR_NOT_FOUND {
- // TODO(brainman): maybe do something else, but panic.
- panic(err)
- }
- } else {
- s.req <- ioSrvReq{o, nil}
- <-o.errc
+ err = syscall.CancelIoEx(fd.Sysfd, &o.o)
+ // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
+ if err != nil && err != syscall.ERROR_NOT_FOUND {
+ // TODO(brainman): maybe do something else, but panic.
+ panic(err)
}
// Wait for cancellation to complete.
fd.pd.waitCanceled(int(o.mode))
return int(o.qty), nil
}
-// Start helper goroutines.
-var rsrv, wsrv ioSrv
-var onceStartServer sync.Once
-
-func startServer() {
- // This is called, once, when only the CancelIo API is available.
- // Start two special goroutines, both locked to an OS thread,
- // that start and cancel IO requests.
- // One will process read requests, while the other will do writes.
- rsrv.req = make(chan ioSrvReq)
- go rsrv.ProcessRemoteIO()
- wsrv.req = make(chan ioSrvReq)
- go wsrv.ProcessRemoteIO()
-}
-
// FD is a file descriptor. The net and os packages embed this type in
// a larger type representing a network connection or OS file.
type FD struct {
// if the user is doing their own overlapped I/O.
// See issue #21172.
//
- // In general the code below avoids calling the ExecIO
- // method for non-network sockets. If some method does
- // somehow call ExecIO, then ExecIO, and therefore the
+ // In general the code below avoids calling the execIO
+ // function for non-network sockets. If some method does
+ // somehow call execIO, then execIO, and therefore the
// calling method, will return an error, because
// fd.pd.runtimeCtx will be 0.
err = fd.pd.init(fd)
fd.wop.fd = fd
fd.rop.runtimeCtx = fd.pd.runtimeCtx
fd.wop.runtimeCtx = fd.pd.runtimeCtx
- if !canCancelIO {
- fd.rop.errc = make(chan error)
- fd.wop.errc = make(chan error)
- }
return "", nil
}
} else {
o := &fd.rop
o.InitBuf(buf)
- n, err = rsrv.ExecIO(o, func(o *operation) error {
+ n, err = execIO(o, func(o *operation) error {
return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if race.Enabled {
defer fd.readUnlock()
o := &fd.rop
o.InitBuf(buf)
- n, err := rsrv.ExecIO(o, func(o *operation) error {
+ n, err := execIO(o, func(o *operation) error {
if o.rsa == nil {
o.rsa = new(syscall.RawSockaddrAny)
}
}
o := &fd.wop
o.InitBuf(b)
- n, err = wsrv.ExecIO(o, func(o *operation) error {
+ n, err = execIO(o, func(o *operation) error {
return syscall.WSASend(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
})
}
}
o := &fd.wop
o.InitBufs(buf)
- n, err := wsrv.ExecIO(o, func(o *operation) error {
+ n, err := execIO(o, func(o *operation) error {
return syscall.WSASend(o.fd.Sysfd, &o.bufs[0], uint32(len(o.bufs)), &o.qty, 0, &o.o, nil)
})
o.ClearBufs()
o := &fd.wop
o.InitBuf(buf)
o.sa = sa
- n, err := wsrv.ExecIO(o, func(o *operation) error {
+ n, err := execIO(o, func(o *operation) error {
return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
})
return n, err
o := &fd.wop
o.InitBuf(b)
o.sa = sa
- n, err := wsrv.ExecIO(o, func(o *operation) error {
+ n, err := execIO(o, func(o *operation) error {
return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
})
ntotal += int(n)
func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
o := &fd.wop
o.sa = ra
- _, err := wsrv.ExecIO(o, func(o *operation) error {
+ _, err := execIO(o, func(o *operation) error {
return ConnectExFunc(o.fd.Sysfd, o.sa, nil, 0, nil, &o.o)
})
return err
// Submit accept request.
o.handle = s
o.rsan = int32(unsafe.Sizeof(rawsa[0]))
- _, err := rsrv.ExecIO(o, func(o *operation) error {
+ _, err := execIO(o, func(o *operation) error {
return AcceptFunc(o.fd.Sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
})
if err != nil {
if !fd.IsStream {
o.flags |= windows.MSG_PEEK
}
- _, err := rsrv.ExecIO(o, func(o *operation) error {
+ _, err := execIO(o, func(o *operation) error {
return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if err == windows.WSAEMSGSIZE {
o.rsa = new(syscall.RawSockaddrAny)
o.msg.Name = o.rsa
o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa))
- n, err := rsrv.ExecIO(o, func(o *operation) error {
+ n, err := execIO(o, func(o *operation) error {
return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil)
})
err = fd.eofError(n, err)
o.msg.Name = (*syscall.RawSockaddrAny)(rsa)
o.msg.Namelen = len
}
- n, err := wsrv.ExecIO(o, func(o *operation) error {
+ n, err := execIO(o, func(o *operation) error {
return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil)
})
return n, int(o.msg.Control.Len), err