From 6ea7bf253c67ab85a4ba99bb1716112ca139de9f Mon Sep 17 00:00:00 2001 From: Alex Brainman Date: Mon, 22 Jul 2013 12:49:57 +1000 Subject: [PATCH] net: implement netpoll for windows Moves the network poller from net package into runtime. benchmark old ns/op new ns/op delta BenchmarkTCP4OneShot 316386 287061 -9.27% BenchmarkTCP4OneShot-2 339822 313424 -7.77% BenchmarkTCP4OneShot-3 330057 306589 -7.11% BenchmarkTCP4OneShotTimeout 341775 287061 -16.01% BenchmarkTCP4OneShotTimeout-2 380835 295849 -22.32% BenchmarkTCP4OneShotTimeout-3 398412 328070 -17.66% BenchmarkTCP4Persistent 40622 33392 -17.80% BenchmarkTCP4Persistent-2 44528 35736 -19.74% BenchmarkTCP4Persistent-3 44919 36907 -17.84% BenchmarkTCP4PersistentTimeout 45309 33588 -25.87% BenchmarkTCP4PersistentTimeout-2 50289 38079 -24.28% BenchmarkTCP4PersistentTimeout-3 51559 37103 -28.04% BenchmarkTCP6OneShot 361305 345645 -4.33% BenchmarkTCP6OneShot-2 361305 331976 -8.12% BenchmarkTCP6OneShot-3 376929 347598 -7.78% BenchmarkTCP6OneShotTimeout 361305 322212 -10.82% BenchmarkTCP6OneShotTimeout-2 378882 333928 -11.86% BenchmarkTCP6OneShotTimeout-3 388647 335881 -13.58% BenchmarkTCP6Persistent 47653 35345 -25.83% BenchmarkTCP6Persistent-2 49215 35736 -27.39% BenchmarkTCP6Persistent-3 38474 37493 -2.55% BenchmarkTCP6PersistentTimeout 56637 34369 -39.32% BenchmarkTCP6PersistentTimeout-2 42575 38079 -10.56% BenchmarkTCP6PersistentTimeout-3 44137 37689 -14.61% R=dvyukov CC=golang-dev https://golang.org/cl/8670044 --- src/pkg/net/fd_poll_runtime.go | 2 +- src/pkg/net/fd_windows.go | 237 ++++++++++----------------- src/pkg/net/sendfile_windows.go | 2 +- src/pkg/net/sockopt_windows.go | 19 --- src/pkg/runtime/defs_windows.go | 8 +- src/pkg/runtime/defs_windows_386.h | 10 ++ src/pkg/runtime/defs_windows_amd64.h | 10 ++ src/pkg/runtime/netpoll.goc | 2 +- src/pkg/runtime/netpoll_stub.c | 2 +- src/pkg/runtime/netpoll_windows.c | 110 +++++++++++++ 10 files changed, 229 insertions(+), 173 deletions(-) create mode 100644 src/pkg/runtime/netpoll_windows.c diff --git a/src/pkg/net/fd_poll_runtime.go b/src/pkg/net/fd_poll_runtime.go index 03ab3e4292..4f20a7e062 100644 --- a/src/pkg/net/fd_poll_runtime.go +++ b/src/pkg/net/fd_poll_runtime.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build darwin linux +// +build darwin linux windows package net diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go index fefd174baf..548c04e374 100644 --- a/src/pkg/net/fd_windows.go +++ b/src/pkg/net/fd_windows.go @@ -74,39 +74,39 @@ type anOpIface interface { Submit() error } -// IO completion result parameters. -type ioResult struct { - qty uint32 - err error -} - // anOp implements functionality common to all IO operations. +// Its beginning must be the same as runtime.net_anOp. Keep these in sync. type anOp struct { // Used by IOCP interface, it must be first field // of the struct, as our code rely on it. o syscall.Overlapped - resultc chan ioResult - errnoc chan error - fd *netFD + // fields used by runtime.netpoll + runtimeCtx uintptr + mode int32 + errno int32 + qty uint32 + + errnoc chan error + fd *netFD } -func (o *anOp) Init(fd *netFD, mode int) { +func (o *anOp) Init(fd *netFD, mode int32) { o.fd = fd - var i int - if mode == 'r' { - i = 0 - } else { - i = 1 - } - if fd.resultc[i] == nil { - fd.resultc[i] = make(chan ioResult, 1) - } - o.resultc = fd.resultc[i] - if fd.errnoc[i] == nil { - fd.errnoc[i] = make(chan error) + o.mode = mode + o.runtimeCtx = fd.pd.runtimeCtx + if !canCancelIO { + var i int + if mode == 'r' { + i = 0 + } else { + i = 1 + } + if fd.errnoc[i] == nil { + fd.errnoc[i] = make(chan error) + } + o.errnoc = fd.errnoc[i] } - o.errnoc = fd.errnoc[i] } func (o *anOp) Op() *anOp { @@ -120,7 +120,7 @@ type bufOp struct { buf syscall.WSABuf } -func (o *bufOp) Init(fd *netFD, buf []byte, mode int) { +func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) { o.anOp.Init(fd, mode) o.buf.Len = uint32(len(buf)) if len(buf) == 0 { @@ -130,41 +130,6 @@ func (o *bufOp) Init(fd *netFD, buf []byte, mode int) { } } -// resultSrv will retrieve all IO completion results from -// iocp and send them to the correspondent waiting client -// goroutine via channel supplied in the request. -type resultSrv struct { - iocp syscall.Handle -} - -func runtime_blockingSyscallHint() - -func (s *resultSrv) Run() { - var o *syscall.Overlapped - var key uint32 - var r ioResult - for { - r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, 0) - if r.err == syscall.Errno(syscall.WAIT_TIMEOUT) && o == nil { - runtime_blockingSyscallHint() - r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE) - } - switch { - case r.err == nil: - // Dequeued successfully completed IO packet. - case r.err == syscall.Errno(syscall.WAIT_TIMEOUT) && o == nil: - // Wait has timed out (should not happen now, but might be used in the future). - panic("GetQueuedCompletionStatus timed out") - case o == nil: - // Failed to dequeue anything -> report the error. - panic("GetQueuedCompletionStatus failed " + r.err.Error()) - default: - // Dequeued failed IO packet. - } - (*anOp)(unsafe.Pointer(o)).resultc <- r - } -} - // ioSrv executes net IO requests. type ioSrv struct { submchan chan anOpIface // submit IO requests @@ -192,18 +157,14 @@ func (s *ioSrv) ProcessRemoteIO() { // ExecIO executes a single IO operation oi. It submits and cancels // IO in the current thread for systems where Windows CancelIoEx API // is available. Alternatively, it passes the request onto -// a special goroutine and waits for completion or cancels request. -// deadline is unix nanos. -func (s *ioSrv) ExecIO(oi anOpIface, deadline int64) (int, error) { +// runtime netpoll and waits for completion or cancels request. +func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { var err error o := oi.Op() - // Calculate timeout delta. - var delta int64 - if deadline != 0 { - delta = deadline - time.Now().UnixNano() - if delta <= 0 { - return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, errTimeout} - } + // Notify runtime netpoll about starting IO. + err = o.fd.pd.Prepare(int(o.mode)) + if err != nil { + return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} } // Start IO. if canCancelIO { @@ -223,67 +184,56 @@ func (s *ioSrv) ExecIO(oi anOpIface, deadline int64) (int, error) { default: return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} } - // Setup timer, if deadline is given. - var timer <-chan time.Time - if delta > 0 { - t := time.NewTimer(time.Duration(delta) * time.Nanosecond) - defer t.Stop() - timer = t.C - } // Wait for our request to complete. - var r ioResult - var cancelled, timeout bool - select { - case r = <-o.resultc: - case <-timer: - cancelled = true - timeout = true - case <-o.fd.closec: - cancelled = true - } - if cancelled { - // Cancel it. - if canCancelIO { - err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o) - // Assuming ERROR_NOT_FOUND is returned, if IO is completed. - if err != nil && err != syscall.ERROR_NOT_FOUND { - // TODO(brainman): maybe do something else, but panic. - panic(err) - } - } else { - s.canchan <- oi - <-o.errnoc - } - // Wait for IO to be canceled or complete successfully. - r = <-o.resultc - if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled - if timeout { - r.err = errTimeout - } else { - r.err = errClosing - } + err = o.fd.pd.Wait(int(o.mode)) + if err == nil { + // All is good. Extract our IO results and return. + if o.errno != 0 { + err = syscall.Errno(o.errno) + return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} } + return int(o.qty), nil } - if r.err != nil { - err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, r.err} + // IO is interrupted by "close" or "timeout" + netpollErr := err + switch netpollErr { + case errClosing, errTimeout: + // will deal with those. + default: + panic("net: unexpected runtime.netpoll error: " + netpollErr.Error()) } - return int(r.qty), err + // Cancel our request. + if canCancelIO { + err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o) + // Assuming ERROR_NOT_FOUND is returned, if IO is completed. + if err != nil && err != syscall.ERROR_NOT_FOUND { + // TODO(brainman): maybe do something else, but panic. + panic(err) + } + } else { + s.canchan <- oi + <-o.errnoc + } + // Wait for cancellation to complete. + o.fd.pd.WaitCanceled(int(o.mode)) + if o.errno != 0 { + err = syscall.Errno(o.errno) + if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled + err = netpollErr + } + return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} + } + // We issued cancellation request. But, it seems, IO operation succeeded + // before cancellation request run. We need to treat IO operation as + // succeeded (the bytes are actually sent/recv from network). + return int(o.qty), nil } // Start helper goroutines. -var resultsrv *resultSrv var iosrv *ioSrv var onceStartServer sync.Once func startServer() { - resultsrv = new(resultSrv) - var err error - resultsrv.iocp, err = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1) - if err != nil { - panic("CreateIoCompletionPort: " + err.Error()) - } - go resultsrv.Run() - iosrv = new(ioSrv) if !canCancelIO { // Only CancelIo API is available. Lets start special goroutine @@ -309,38 +259,30 @@ type netFD struct { net string laddr Addr raddr Addr - resultc [2]chan ioResult // read/write completion results - errnoc [2]chan error // read/write submit or cancel operation errors - closec chan bool // used by Close to cancel pending IO + errnoc [2]chan error // read/write submit or cancel operation errors // serialize access to Read and Write methods rio, wio sync.Mutex - // read and write deadlines - rdeadline, wdeadline deadline + // wait server + pd pollDesc } -func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD { +func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) { + if initErr != nil { + return nil, initErr + } + onceStartServer.Do(startServer) netfd := &netFD{ sysfd: fd, family: family, sotype: sotype, net: net, - closec: make(chan bool), - } - return netfd -} - -func newFD(fd syscall.Handle, family, proto int, net string) (*netFD, error) { - if initErr != nil { - return nil, initErr } - onceStartServer.Do(startServer) - // Associate our socket with resultsrv.iocp. - if _, err := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); err != nil { + if err := netfd.pd.Init(netfd); err != nil { return nil, err } - return allocFD(fd, family, proto, net), nil + return netfd, nil } func (fd *netFD) setAddr(laddr, raddr Addr) { @@ -386,7 +328,7 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { var o connectOp o.Init(fd, 'w') o.ra = ra - _, err := iosrv.ExecIO(&o, fd.wdeadline.value()) + _, err := iosrv.ExecIO(&o) if err != nil { return err } @@ -438,7 +380,7 @@ func (fd *netFD) Close() error { } defer fd.decref() // unblock pending reader and writer - close(fd.closec) + fd.pd.Evict() // wait for both reader and writer to exit fd.rio.Lock() defer fd.rio.Unlock() @@ -495,7 +437,7 @@ func (fd *netFD) Read(buf []byte) (int, error) { defer fd.rio.Unlock() var o readOp o.Init(fd, buf, 'r') - n, err := iosrv.ExecIO(&o, fd.rdeadline.value()) + n, err := iosrv.ExecIO(&o) if err == nil && n == 0 { err = io.EOF } @@ -532,7 +474,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { var o readFromOp o.Init(fd, buf, 'r') o.rsan = int32(unsafe.Sizeof(o.rsa)) - n, err = iosrv.ExecIO(&o, fd.rdeadline.value()) + n, err = iosrv.ExecIO(&o) if err != nil { return 0, nil, err } @@ -564,7 +506,7 @@ func (fd *netFD) Write(buf []byte) (int, error) { defer fd.wio.Unlock() var o writeOp o.Init(fd, buf, 'w') - return iosrv.ExecIO(&o, fd.wdeadline.value()) + return iosrv.ExecIO(&o) } // WriteTo to network. @@ -596,7 +538,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { var o writeToOp o.Init(fd, buf, 'w') o.sa = sa - return iosrv.ExecIO(&o, fd.wdeadline.value()) + return iosrv.ExecIO(&o) } // Accept new network connections. @@ -631,17 +573,17 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { } // Associate our new socket with IOCP. - onceStartServer.Do(startServer) - if _, err := syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); err != nil { + netfd, err := newFD(s, fd.family, fd.sotype, fd.net) + if err != nil { closesocket(s) - return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, err} + return nil, &OpError{"accept", fd.net, fd.laddr, err} } // Submit accept request. var o acceptOp o.Init(fd, 'r') o.newsock = s - _, err = iosrv.ExecIO(&o, fd.rdeadline.value()) + _, err = iosrv.ExecIO(&o) if err != nil { closesocket(s) return nil, err @@ -663,7 +605,6 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { lsa, _ := lrsa.Sockaddr() rsa, _ := rrsa.Sockaddr() - netfd := allocFD(s, fd.family, fd.sotype, fd.net) netfd.setAddr(toAddr(lsa), toAddr(rsa)) return netfd, nil } diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go index 2d64f2f5bf..5012583b2c 100644 --- a/src/pkg/net/sendfile_windows.go +++ b/src/pkg/net/sendfile_windows.go @@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { o.Init(c, 'w') o.n = uint32(n) o.src = syscall.Handle(f.Fd()) - done, err := iosrv.ExecIO(&o, 0) + done, err := iosrv.ExecIO(&o) if err != nil { return 0, err, false } diff --git a/src/pkg/net/sockopt_windows.go b/src/pkg/net/sockopt_windows.go index 0861fe8f4b..509b5963bf 100644 --- a/src/pkg/net/sockopt_windows.go +++ b/src/pkg/net/sockopt_windows.go @@ -9,7 +9,6 @@ package net import ( "os" "syscall" - "time" ) func setDefaultSockopts(s syscall.Handle, f, t int, ipv6only bool) error { @@ -48,21 +47,3 @@ func setDefaultMulticastSockopts(s syscall.Handle) error { } return nil } - -// TODO(dfc) these unused error returns could be removed - -func setReadDeadline(fd *netFD, t time.Time) error { - fd.rdeadline.setTime(t) - return nil -} - -func setWriteDeadline(fd *netFD, t time.Time) error { - fd.wdeadline.setTime(t) - return nil -} - -func setDeadline(fd *netFD, t time.Time) error { - setReadDeadline(fd, t) - setWriteDeadline(fd, t) - return nil -} diff --git a/src/pkg/runtime/defs_windows.go b/src/pkg/runtime/defs_windows.go index 0d525b9322..01aea92dee 100644 --- a/src/pkg/runtime/defs_windows.go +++ b/src/pkg/runtime/defs_windows.go @@ -7,8 +7,8 @@ /* Input to cgo. -GOARCH=amd64 cgo -cdefs defs.go >amd64/defs.h -GOARCH=386 cgo -cdefs defs.go >386/defs.h +GOARCH=amd64 go tool cgo -cdefs defs_windows.go > defs_windows_amd64.h +GOARCH=386 go tool cgo -cdefs defs_windows.go > defs_windows_386.h */ package runtime @@ -57,6 +57,9 @@ const ( EXCEPTION_FLT_UNDERFLOW = C.STATUS_FLOAT_UNDERFLOW EXCEPTION_INT_DIVIDE_BY_ZERO = C.STATUS_INTEGER_DIVIDE_BY_ZERO EXCEPTION_INT_OVERFLOW = C.STATUS_INTEGER_OVERFLOW + + INFINITE = C.INFINITE + WAIT_TIMEOUT = C.WAIT_TIMEOUT ) type SystemInfo C.SYSTEM_INFO @@ -64,3 +67,4 @@ type ExceptionRecord C.EXCEPTION_RECORD type FloatingSaveArea C.FLOATING_SAVE_AREA type M128a C.M128A type Context C.CONTEXT +type Overlapped C.OVERLAPPED diff --git a/src/pkg/runtime/defs_windows_386.h b/src/pkg/runtime/defs_windows_386.h index 3377db99e6..db3629a1d2 100644 --- a/src/pkg/runtime/defs_windows_386.h +++ b/src/pkg/runtime/defs_windows_386.h @@ -30,6 +30,9 @@ enum { EXCEPTION_FLT_UNDERFLOW = 0xc0000093, EXCEPTION_INT_DIVIDE_BY_ZERO = 0xc0000094, EXCEPTION_INT_OVERFLOW = 0xc0000095, + + INFINITE = 0xffffffff, + WAIT_TIMEOUT = 0x102, }; typedef struct SystemInfo SystemInfo; @@ -37,6 +40,7 @@ typedef struct ExceptionRecord ExceptionRecord; typedef struct FloatingSaveArea FloatingSaveArea; typedef struct M128a M128a; typedef struct Context Context; +typedef struct Overlapped Overlapped; #pragma pack on @@ -98,6 +102,12 @@ struct Context { uint32 SegSs; uint8 ExtendedRegisters[512]; }; +struct Overlapped { + uint32 Internal; + uint32 InternalHigh; + byte anon0[8]; + byte *hEvent; +}; #pragma pack off diff --git a/src/pkg/runtime/defs_windows_amd64.h b/src/pkg/runtime/defs_windows_amd64.h index c0a99ea78c..fe26f5a84a 100644 --- a/src/pkg/runtime/defs_windows_amd64.h +++ b/src/pkg/runtime/defs_windows_amd64.h @@ -30,6 +30,9 @@ enum { EXCEPTION_FLT_UNDERFLOW = 0xc0000093, EXCEPTION_INT_DIVIDE_BY_ZERO = 0xc0000094, EXCEPTION_INT_OVERFLOW = 0xc0000095, + + INFINITE = 0xffffffff, + WAIT_TIMEOUT = 0x102, }; typedef struct SystemInfo SystemInfo; @@ -37,6 +40,7 @@ typedef struct ExceptionRecord ExceptionRecord; typedef struct FloatingSaveArea FloatingSaveArea; typedef struct M128a M128a; typedef struct Context Context; +typedef struct Overlapped Overlapped; #pragma pack on @@ -113,6 +117,12 @@ struct Context { uint64 LastExceptionToRip; uint64 LastExceptionFromRip; }; +struct Overlapped { + uint64 Internal; + uint64 InternalHigh; + byte anon0[8]; + byte *hEvent; +}; #pragma pack off diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc index e9c0218393..66557cc238 100644 --- a/src/pkg/runtime/netpoll.goc +++ b/src/pkg/runtime/netpoll.goc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build darwin linux +// +build darwin linux windows package net diff --git a/src/pkg/runtime/netpoll_stub.c b/src/pkg/runtime/netpoll_stub.c index 39d19a4cea..c6ecf67d15 100644 --- a/src/pkg/runtime/netpoll_stub.c +++ b/src/pkg/runtime/netpoll_stub.c @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build freebsd netbsd openbsd plan9 windows +// +build freebsd netbsd openbsd plan9 #include "runtime.h" diff --git a/src/pkg/runtime/netpoll_windows.c b/src/pkg/runtime/netpoll_windows.c new file mode 100644 index 0000000000..52ba7e46e6 --- /dev/null +++ b/src/pkg/runtime/netpoll_windows.c @@ -0,0 +1,110 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +#include "runtime.h" +#include "defs_GOOS_GOARCH.h" +#include "os_GOOS.h" + +#define DWORD_MAX 0xffffffff + +#pragma dynimport runtime·CreateIoCompletionPort CreateIoCompletionPort "kernel32.dll" +#pragma dynimport runtime·GetQueuedCompletionStatus GetQueuedCompletionStatus "kernel32.dll" + +extern void *runtime·CreateIoCompletionPort; +extern void *runtime·GetQueuedCompletionStatus; + +#define INVALID_HANDLE_VALUE ((uintptr)-1) + +// net_anOp must be the same as beginning of net.anOp. Keep these in sync. +typedef struct net_anOp net_anOp; +struct net_anOp +{ + // used by windows + Overlapped o; + // used by netpoll + uintptr runtimeCtx; + int32 mode; + int32 errno; + uint32 qty; +}; + +static uintptr iocphandle = INVALID_HANDLE_VALUE; // completion port io handle + +void +runtime·netpollinit(void) +{ + iocphandle = (uintptr)runtime·stdcall(runtime·CreateIoCompletionPort, 4, INVALID_HANDLE_VALUE, (uintptr)0, (uintptr)0, (uintptr)DWORD_MAX); + if(iocphandle == 0) { + runtime·printf("netpoll: failed to create iocp handle (errno=%d)\n", runtime·getlasterror()); + runtime·throw("netpoll: failed to create iocp handle"); + } + return; +} + +int32 +runtime·netpollopen(uintptr fd, PollDesc *pd) +{ + USED(pd); + if(runtime·stdcall(runtime·CreateIoCompletionPort, 4, fd, iocphandle, (uintptr)0, (uintptr)0) == 0) + return -runtime·getlasterror(); + return 0; +} + +int32 +runtime·netpollclose(uintptr fd) +{ + // nothing to do + USED(fd); + return 0; +} + +// Polls for completed network IO. +// Returns list of goroutines that become runnable. +G* +runtime·netpoll(bool block) +{ + uint32 wait, qty, key; + int32 mode, errno; + net_anOp *o; + G *gp; + + if(iocphandle == INVALID_HANDLE_VALUE) + return nil; + o = nil; + errno = 0; + qty = 0; + wait = INFINITE; + if(!block) + // TODO(brainman): should use 0 here instead, but scheduler hogs CPU + wait = 1; + // TODO(brainman): Need a loop here to fetch all pending notifications + // (or at least a batch). Scheduler will behave better if is given + // a batch of newly runnable goroutines. + // TODO(brainman): Call GetQueuedCompletionStatusEx() here when possible. + if(runtime·stdcall(runtime·GetQueuedCompletionStatus, 5, iocphandle, &qty, &key, &o, (uintptr)wait) == 0) { + errno = runtime·getlasterror(); + if(o == nil && errno == WAIT_TIMEOUT) { + if(!block) + return nil; + runtime·throw("netpoll: GetQueuedCompletionStatus timed out"); + } + if(o == nil) { + runtime·printf("netpoll: GetQueuedCompletionStatus failed (errno=%d)\n", errno); + runtime·throw("netpoll: GetQueuedCompletionStatus failed"); + } + // dequeued failed IO packet, so report that + } + if(o == nil) + runtime·throw("netpoll: GetQueuedCompletionStatus returned o == nil"); + mode = o->mode; + if(mode != 'r' && mode != 'w') { + runtime·printf("netpoll: GetQueuedCompletionStatus returned invalid mode=%d\n", mode); + runtime·throw("netpoll: GetQueuedCompletionStatus returned invalid mode"); + } + o->errno = errno; + o->qty = qty; + gp = nil; + runtime·netpollready(&gp, (void*)o->runtimeCtx, mode); + return gp; +} -- 2.48.1