From 0bee99ab3b17caca812aa78a51485aadf0bc1788 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Thu, 14 Mar 2013 10:38:37 +0400 Subject: [PATCH] runtime: integrated network poller for darwin vs tip: benchmark old ns/op new ns/op delta BenchmarkTCP4Persistent 67786 33175 -51.06% BenchmarkTCP4Persistent-2 49085 31227 -36.38% BenchmarkTCP4PersistentTimeout 69265 32565 -52.98% BenchmarkTCP4PersistentTimeout-2 49217 32588 -33.79% vs old scheduler: benchmark old ns/op new ns/op delta BenchmarkTCP4Persistent 63517 33175 -47.77% BenchmarkTCP4Persistent-2 54760 31227 -42.97% BenchmarkTCP4PersistentTimeout 63234 32565 -48.50% BenchmarkTCP4PersistentTimeout-2 56956 32588 -42.78% R=golang-dev, bradfitz, devon.odell, mikioh.mikioh, iant, rsc CC=golang-dev, pabuhr https://golang.org/cl/7569043 --- src/pkg/net/fd_darwin.go | 126 ---------- src/pkg/net/fd_poll_runtime.go | 119 ++++++++++ src/pkg/net/fd_poll_unix.go | 2 +- src/pkg/runtime/defs_darwin.go | 20 +- src/pkg/runtime/defs_darwin_386.h | 30 ++- src/pkg/runtime/defs_darwin_amd64.h | 30 ++- src/pkg/runtime/netpoll.goc | 346 ++++++++++++++++++++++++++++ src/pkg/runtime/netpoll_kqueue.c | 96 ++++++++ src/pkg/runtime/netpoll_stub.c | 2 + src/pkg/runtime/runtime.h | 4 + src/pkg/runtime/sys_darwin_386.s | 29 +++ src/pkg/runtime/sys_darwin_amd64.s | 34 +++ 12 files changed, 705 insertions(+), 133 deletions(-) delete mode 100644 src/pkg/net/fd_darwin.go create mode 100644 src/pkg/net/fd_poll_runtime.go create mode 100644 src/pkg/runtime/netpoll.goc create mode 100644 src/pkg/runtime/netpoll_kqueue.c diff --git a/src/pkg/net/fd_darwin.go b/src/pkg/net/fd_darwin.go deleted file mode 100644 index 382465ba66..0000000000 --- a/src/pkg/net/fd_darwin.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2009 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Waiting for FDs via kqueue/kevent. - -package net - -import ( - "errors" - "os" - "syscall" -) - -type pollster struct { - kq int - eventbuf [10]syscall.Kevent_t - events []syscall.Kevent_t - - // An event buffer for AddFD/DelFD. - // Must hold pollServer lock. - kbuf [1]syscall.Kevent_t -} - -func newpollster() (p *pollster, err error) { - p = new(pollster) - if p.kq, err = syscall.Kqueue(); err != nil { - return nil, os.NewSyscallError("kqueue", err) - } - syscall.CloseOnExec(p.kq) - p.events = p.eventbuf[0:0] - return p, nil -} - -// First return value is whether the pollServer should be woken up. -// This version always returns false. -func (p *pollster) AddFD(fd int, mode int, repeat bool) (bool, error) { - // pollServer is locked. - - var kmode int - if mode == 'r' { - kmode = syscall.EVFILT_READ - } else { - kmode = syscall.EVFILT_WRITE - } - ev := &p.kbuf[0] - // EV_ADD - add event to kqueue list - // EV_RECEIPT - generate fake EV_ERROR as result of add, - // rather than waiting for real event - // EV_ONESHOT - delete the event the first time it triggers - flags := syscall.EV_ADD | syscall.EV_RECEIPT - if !repeat { - flags |= syscall.EV_ONESHOT - } - syscall.SetKevent(ev, fd, kmode, flags) - - n, err := syscall.Kevent(p.kq, p.kbuf[:], p.kbuf[:], nil) - if err != nil { - return false, os.NewSyscallError("kevent", err) - } - if n != 1 || (ev.Flags&syscall.EV_ERROR) == 0 || int(ev.Ident) != fd || int(ev.Filter) != kmode { - return false, errors.New("kqueue phase error") - } - if ev.Data != 0 { - return false, syscall.Errno(ev.Data) - } - return false, nil -} - -// Return value is whether the pollServer should be woken up. -// This version always returns false. -func (p *pollster) DelFD(fd int, mode int) bool { - // pollServer is locked. - - var kmode int - if mode == 'r' { - kmode = syscall.EVFILT_READ - } else { - kmode = syscall.EVFILT_WRITE - } - ev := &p.kbuf[0] - // EV_DELETE - delete event from kqueue list - // EV_RECEIPT - generate fake EV_ERROR as result of add, - // rather than waiting for real event - syscall.SetKevent(ev, fd, kmode, syscall.EV_DELETE|syscall.EV_RECEIPT) - syscall.Kevent(p.kq, p.kbuf[0:], p.kbuf[0:], nil) - return false -} - -func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err error) { - var t *syscall.Timespec - for len(p.events) == 0 { - if nsec > 0 { - if t == nil { - t = new(syscall.Timespec) - } - *t = syscall.NsecToTimespec(nsec) - } - - s.Unlock() - n, err := syscall.Kevent(p.kq, nil, p.eventbuf[:], t) - s.Lock() - - if err != nil { - if err == syscall.EINTR { - continue - } - return -1, 0, os.NewSyscallError("kevent", nil) - } - if n == 0 { - return -1, 0, nil - } - p.events = p.eventbuf[:n] - } - ev := &p.events[0] - p.events = p.events[1:] - fd = int(ev.Ident) - if ev.Filter == syscall.EVFILT_READ { - mode = 'r' - } else { - mode = 'w' - } - return fd, mode, nil -} - -func (p *pollster) Close() error { return os.NewSyscallError("close", syscall.Close(p.kq)) } diff --git a/src/pkg/net/fd_poll_runtime.go b/src/pkg/net/fd_poll_runtime.go new file mode 100644 index 0000000000..2c8b47edc4 --- /dev/null +++ b/src/pkg/net/fd_poll_runtime.go @@ -0,0 +1,119 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin + +package net + +import ( + "sync" + "syscall" + "time" +) + +func runtime_pollServerInit() +func runtime_pollOpen(fd int) (uintptr, int) +func runtime_pollClose(ctx uintptr) +func runtime_pollWait(ctx uintptr, mode int) int +func runtime_pollReset(ctx uintptr, mode int) int +func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) +func runtime_pollUnblock(ctx uintptr) + +var canCancelIO = true // used for testing current package + +type pollDesc struct { + runtimeCtx uintptr +} + +var serverInit sync.Once + +func sysInit() { +} + +func (pd *pollDesc) Init(fd *netFD) error { + serverInit.Do(runtime_pollServerInit) + ctx, errno := runtime_pollOpen(fd.sysfd) + if errno != 0 { + return syscall.Errno(errno) + } + pd.runtimeCtx = ctx + return nil +} + +func (pd *pollDesc) Close() { + runtime_pollClose(pd.runtimeCtx) +} + +func (pd *pollDesc) Lock() { +} + +func (pd *pollDesc) Unlock() { +} + +func (pd *pollDesc) Wakeup() { +} + +// Evict evicts fd from the pending list, unblocking any I/O running on fd. +// Return value is whether the pollServer should be woken up. +func (pd *pollDesc) Evict() bool { + runtime_pollUnblock(pd.runtimeCtx) + return false +} + +func (pd *pollDesc) PrepareRead() error { + res := runtime_pollReset(pd.runtimeCtx, 'r') + return convertErr(res) +} + +func (pd *pollDesc) PrepareWrite() error { + res := runtime_pollReset(pd.runtimeCtx, 'w') + return convertErr(res) +} + +func (pd *pollDesc) WaitRead() error { + res := runtime_pollWait(pd.runtimeCtx, 'r') + return convertErr(res) +} + +func (pd *pollDesc) WaitWrite() error { + res := runtime_pollWait(pd.runtimeCtx, 'w') + return convertErr(res) +} + +func convertErr(res int) error { + switch res { + case 0: + return nil + case 1: + return errClosing + case 2: + return errTimeout + } + panic("unreachable") +} + +func setReadDeadline(fd *netFD, t time.Time) error { + return setDeadlineImpl(fd, t, 'r') +} + +func setWriteDeadline(fd *netFD, t time.Time) error { + return setDeadlineImpl(fd, t, 'w') +} + +func setDeadline(fd *netFD, t time.Time) error { + return setDeadlineImpl(fd, t, 'r'+'w') +} + +func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { + d := t.UnixNano() + if t.IsZero() { + d = 0 + } + if err := fd.incref(false); err != nil { + return err + } + runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode) + fd.decref() + return nil +} diff --git a/src/pkg/net/fd_poll_unix.go b/src/pkg/net/fd_poll_unix.go index 7f7f764f9c..c08eb0033e 100644 --- a/src/pkg/net/fd_poll_unix.go +++ b/src/pkg/net/fd_poll_unix.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build darwin freebsd linux netbsd openbsd +// +build freebsd linux netbsd openbsd package net diff --git a/src/pkg/runtime/defs_darwin.go b/src/pkg/runtime/defs_darwin.go index 7f22b0b8e5..722013ba96 100644 --- a/src/pkg/runtime/defs_darwin.go +++ b/src/pkg/runtime/defs_darwin.go @@ -7,8 +7,8 @@ /* Input to cgo. -GOARCH=amd64 cgo -cdefs defs_darwin.go >defs_darwin_amd64.h -GOARCH=386 cgo -cdefs defs_darwin.go >defs_darwin_386.h +GOARCH=amd64 go tool cgo -cdefs defs_darwin.go >defs_darwin_amd64.h +GOARCH=386 go tool cgo -cdefs defs_darwin.go >defs_darwin_386.h */ package runtime @@ -19,12 +19,17 @@ package runtime #include #include #include +#include #include +#include #include */ import "C" const ( + EINTR = C.EINTR + EFAULT = C.EFAULT + PROT_NONE = C.PROT_NONE PROT_READ = C.PROT_READ PROT_WRITE = C.PROT_WRITE @@ -128,6 +133,14 @@ const ( ITIMER_REAL = C.ITIMER_REAL ITIMER_VIRTUAL = C.ITIMER_VIRTUAL ITIMER_PROF = C.ITIMER_PROF + + EV_ADD = C.EV_ADD + EV_DELETE = C.EV_DELETE + EV_CLEAR = C.EV_CLEAR + EV_RECEIPT = C.EV_RECEIPT + EV_ERROR = C.EV_ERROR + EVFILT_READ = C.EVFILT_READ + EVFILT_WRITE = C.EVFILT_WRITE ) type MachBody C.mach_msg_body_t @@ -144,6 +157,7 @@ type Sigval C.union_sigval type Siginfo C.siginfo_t type Timeval C.struct_timeval type Itimerval C.struct_itimerval +type Timespec C.struct_timespec type FPControl C.struct_fp_control type FPStatus C.struct_fp_status @@ -161,3 +175,5 @@ type ExceptionState32 C.struct_i386_exception_state type Mcontext32 C.struct_mcontext32 type Ucontext C.struct_ucontext + +type Kevent C.struct_kevent diff --git a/src/pkg/runtime/defs_darwin_386.h b/src/pkg/runtime/defs_darwin_386.h index 92732f4602..7b210eebf8 100644 --- a/src/pkg/runtime/defs_darwin_386.h +++ b/src/pkg/runtime/defs_darwin_386.h @@ -3,6 +3,9 @@ enum { + EINTR = 0x4, + EFAULT = 0xe, + PROT_NONE = 0x0, PROT_READ = 0x1, PROT_WRITE = 0x2, @@ -106,6 +109,14 @@ enum { ITIMER_REAL = 0x0, ITIMER_VIRTUAL = 0x1, ITIMER_PROF = 0x2, + + EV_ADD = 0x1, + EV_DELETE = 0x2, + EV_CLEAR = 0x20, + EV_RECEIPT = 0x40, + EV_ERROR = 0x4000, + EVFILT_READ = -0x1, + EVFILT_WRITE = -0x2, }; typedef struct MachBody MachBody; @@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction; typedef struct Siginfo Siginfo; typedef struct Timeval Timeval; typedef struct Itimerval Itimerval; +typedef struct Timespec Timespec; typedef struct FPControl FPControl; typedef struct FPStatus FPStatus; typedef struct RegMMST RegMMST; @@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32; typedef struct ExceptionState32 ExceptionState32; typedef struct Mcontext32 Mcontext32; typedef struct Ucontext Ucontext; +typedef struct Kevent Kevent; #pragma pack on @@ -170,7 +183,7 @@ struct StackT { typedef byte Sighandler[4]; struct Sigaction { - Sighandler __sigaction_u; + byte __sigaction_u[4]; void *sa_tramp; uint32 sa_mask; int32 sa_flags; @@ -185,7 +198,7 @@ struct Siginfo { uint32 si_uid; int32 si_status; byte *si_addr; - Sigval si_value; + byte si_value[4]; int32 si_band; uint32 __pad[7]; }; @@ -197,6 +210,10 @@ struct Itimerval { Timeval it_interval; Timeval it_value; }; +struct Timespec { + int32 tv_sec; + int32 tv_nsec; +}; struct FPControl { byte Pad_cgo_0[2]; @@ -362,5 +379,14 @@ struct Ucontext { Mcontext32 *uc_mcontext; }; +struct Kevent { + uint32 ident; + int16 filter; + uint16 flags; + uint32 fflags; + int32 data; + byte *udata; +}; + #pragma pack off diff --git a/src/pkg/runtime/defs_darwin_amd64.h b/src/pkg/runtime/defs_darwin_amd64.h index d4fbfef499..2d464a9e50 100644 --- a/src/pkg/runtime/defs_darwin_amd64.h +++ b/src/pkg/runtime/defs_darwin_amd64.h @@ -3,6 +3,9 @@ enum { + EINTR = 0x4, + EFAULT = 0xe, + PROT_NONE = 0x0, PROT_READ = 0x1, PROT_WRITE = 0x2, @@ -106,6 +109,14 @@ enum { ITIMER_REAL = 0x0, ITIMER_VIRTUAL = 0x1, ITIMER_PROF = 0x2, + + EV_ADD = 0x1, + EV_DELETE = 0x2, + EV_CLEAR = 0x20, + EV_RECEIPT = 0x40, + EV_ERROR = 0x4000, + EVFILT_READ = -0x1, + EVFILT_WRITE = -0x2, }; typedef struct MachBody MachBody; @@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction; typedef struct Siginfo Siginfo; typedef struct Timeval Timeval; typedef struct Itimerval Itimerval; +typedef struct Timespec Timespec; typedef struct FPControl FPControl; typedef struct FPStatus FPStatus; typedef struct RegMMST RegMMST; @@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32; typedef struct ExceptionState32 ExceptionState32; typedef struct Mcontext32 Mcontext32; typedef struct Ucontext Ucontext; +typedef struct Kevent Kevent; #pragma pack on @@ -171,7 +184,7 @@ struct StackT { typedef byte Sighandler[8]; struct Sigaction { - Sighandler __sigaction_u; + byte __sigaction_u[8]; void *sa_tramp; uint32 sa_mask; int32 sa_flags; @@ -186,7 +199,7 @@ struct Siginfo { uint32 si_uid; int32 si_status; byte *si_addr; - Sigval si_value; + byte si_value[8]; int64 si_band; uint64 __pad[7]; }; @@ -199,6 +212,10 @@ struct Itimerval { Timeval it_interval; Timeval it_value; }; +struct Timespec { + int64 tv_sec; + int64 tv_nsec; +}; struct FPControl { byte Pad_cgo_0[2]; @@ -365,5 +382,14 @@ struct Ucontext { Mcontext64 *uc_mcontext; }; +struct Kevent { + uint64 ident; + int16 filter; + uint16 flags; + uint32 fflags; + int64 data; + byte *udata; +}; + #pragma pack off diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc new file mode 100644 index 0000000000..2c61952700 --- /dev/null +++ b/src/pkg/runtime/netpoll.goc @@ -0,0 +1,346 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +#include "runtime.h" +#include "defs_GOOS_GOARCH.h" +#include "arch_GOARCH.h" +#include "malloc.h" + +// Integrated network poller (platform-independent part). +// A particular implementation (epoll/kqueue) must define the following functions: +// void runtime·netpollinit(void); // to initialize the poller +// int32 runtime·netpollopen(int32 fd, PollDesc *pd); // to arm edge-triggered notifications + // and associate fd with pd. +// An implementation must call the following function to denote that the pd is ready. +// void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode); + +#define READY ((G*)1) + +struct PollDesc +{ + PollDesc* link; // in pollcache, protected by pollcache.Lock + Lock; // protectes the following fields + bool closing; + uintptr seq; // protects from stale timers and ready notifications + G* rg; // G waiting for read or READY (binary semaphore) + Timer rt; // read deadline timer (set if rt.fv != nil) + int64 rd; // read deadline + G* wg; // the same for writes + Timer wt; + int64 wd; +}; + +static struct +{ + Lock; + PollDesc* first; + // PollDesc objects must be type-stable, + // because we can get ready notification from epoll/kqueue + // after the descriptor is closed/reused. + // Stale notifications are detected using seq variable, + // seq is incremented when deadlines are changed or descriptor is reused. +} pollcache; + +static void netpollblock(PollDesc*, int32); +static G* netpollunblock(PollDesc*, int32); +static void deadline(int64, Eface); +static void readDeadline(int64, Eface); +static void writeDeadline(int64, Eface); +static PollDesc* allocPollDesc(void); +static intgo checkerr(PollDesc *pd, int32 mode); + +static FuncVal deadlineFn = {(void(*)(void))deadline}; +static FuncVal readDeadlineFn = {(void(*)(void))readDeadline}; +static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline}; + +func runtime_pollServerInit() { + runtime·netpollinit(); +} + +func runtime_pollOpen(fd int) (pd *PollDesc, errno int) { + pd = allocPollDesc(); + runtime·lock(pd); + if(pd->wg != nil && pd->wg != READY) + runtime·throw("runtime_pollOpen: blocked write on free descriptor"); + if(pd->rg != nil && pd->rg != READY) + runtime·throw("runtime_pollOpen: blocked read on free descriptor"); + pd->closing = false; + pd->seq++; + pd->rg = nil; + pd->rd = 0; + pd->wg = nil; + pd->wd = 0; + runtime·unlock(pd); + + errno = runtime·netpollopen(fd, pd); +} + +func runtime_pollClose(pd *PollDesc) { + if(!pd->closing) + runtime·throw("runtime_pollClose: close w/o unblock"); + if(pd->wg != nil && pd->wg != READY) + runtime·throw("runtime_pollClose: blocked write on closing descriptor"); + if(pd->rg != nil && pd->rg != READY) + runtime·throw("runtime_pollClose: blocked read on closing descriptor"); + runtime·lock(&pollcache); + pd->link = pollcache.first; + pollcache.first = pd; + runtime·unlock(&pollcache); +} + +func runtime_pollReset(pd *PollDesc, mode int) (err int) { + runtime·lock(pd); + err = checkerr(pd, mode); + if(err) + goto ret; + if(mode == 'r') + pd->rg = nil; + else if(mode == 'w') + pd->wg = nil; +ret: + runtime·unlock(pd); +} + +func runtime_pollWait(pd *PollDesc, mode int) (err int) { + runtime·lock(pd); + err = checkerr(pd, mode); + if(err) + goto ret; + netpollblock(pd, mode); + err = checkerr(pd, mode); +ret: + runtime·unlock(pd); +} + +func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { + runtime·lock(pd); + if(pd->closing) + goto ret; + pd->seq++; // invalidate current timers + // Reset current timers. + if(pd->rt.fv) { + runtime·deltimer(&pd->rt); + pd->rt.fv = nil; + } + if(pd->wt.fv) { + runtime·deltimer(&pd->wt); + pd->wt.fv = nil; + } + // Setup new timers. + if(d != 0 && d <= runtime·nanotime()) { + d = -1; + } + if(mode == 'r' || mode == 'r'+'w') + pd->rd = d; + if(mode == 'w' || mode == 'r'+'w') + pd->wd = d; + if(pd->rd > 0 && pd->rd == pd->wd) { + pd->rt.fv = &deadlineFn; + pd->rt.when = pd->rd; + // Copy current seq into the timer arg. + // Timer func will check the seq against current descriptor seq, + // if they differ the descriptor was reused or timers were reset. + pd->rt.arg.type = (Type*)pd->seq; + pd->rt.arg.data = pd; + runtime·addtimer(&pd->rt); + } else { + if(pd->rd > 0) { + pd->rt.fv = &readDeadlineFn; + pd->rt.when = pd->rd; + pd->rt.arg.type = (Type*)pd->seq; + pd->rt.arg.data = pd; + runtime·addtimer(&pd->rt); + } + if(pd->wd > 0) { + pd->wt.fv = &writeDeadlineFn; + pd->wt.when = pd->wd; + pd->wt.arg.type = (Type*)pd->seq; + pd->wt.arg.data = pd; + runtime·addtimer(&pd->wt); + } + } +ret: + runtime·unlock(pd); +} + +func runtime_pollUnblock(pd *PollDesc) { + G *rg, *wg; + + runtime·lock(pd); + if(pd->closing) + runtime·throw("runtime_pollUnblock: already closing"); + pd->closing = true; + pd->seq++; + rg = netpollunblock(pd, 'r'); + wg = netpollunblock(pd, 'w'); + if(pd->rt.fv) { + runtime·deltimer(&pd->rt); + pd->rt.fv = nil; + } + if(pd->wt.fv) { + runtime·deltimer(&pd->wt); + pd->wt.fv = nil; + } + runtime·unlock(pd); + if(rg) + runtime·ready(rg); + if(wg) + runtime·ready(wg); +} + +// make pd ready, newly runnable goroutines (if any) are enqueued info gpp list +void +runtime·netpollready(G **gpp, PollDesc *pd, int32 mode) +{ + G *rg, *wg; + + rg = wg = nil; + runtime·lock(pd); + if(mode == 'r' || mode == 'r'+'w') + rg = netpollunblock(pd, 'r'); + if(mode == 'w' || mode == 'r'+'w') + wg = netpollunblock(pd, 'w'); + runtime·unlock(pd); + if(rg) { + rg->schedlink = *gpp; + *gpp = rg; + } + if(wg) { + wg->schedlink = *gpp; + *gpp = wg; + } +} + +static intgo +checkerr(PollDesc *pd, int32 mode) +{ + if(pd->closing) + return 1; // errClosing + if((mode == 'r' && pd->rd < 0) || (mode == 'w' && pd->wd < 0)) + return 2; // errTimeout + return 0; +} + +static void +netpollblock(PollDesc *pd, int32 mode) +{ + G **gpp; + + gpp = &pd->rg; + if(mode == 'w') + gpp = &pd->wg; + if(*gpp == READY) { + *gpp = nil; + return; + } + if(*gpp != nil) + runtime·throw("epoll: double wait"); + *gpp = g; + runtime·park(runtime·unlock, &pd->Lock, "IO wait"); + runtime·lock(pd); +} + +static G* +netpollunblock(PollDesc *pd, int32 mode) +{ + G **gpp, *old; + + gpp = &pd->rg; + if(mode == 'w') + gpp = &pd->wg; + if(*gpp == READY) + return nil; + if(*gpp == nil) { + *gpp = READY; + return nil; + } + old = *gpp; + *gpp = nil; + return old; +} + +static void +deadlineimpl(int64 now, Eface arg, bool read, bool write) +{ + PollDesc *pd; + uint32 seq; + G *rg, *wg; + + USED(now); + pd = (PollDesc*)arg.data; + // This is the seq when the timer was set. + // If it's stale, ignore the timer event. + seq = (uintptr)arg.type; + rg = wg = nil; + runtime·lock(pd); + if(seq != pd->seq) { + // The descriptor was reused or timers were reset. + runtime·unlock(pd); + return; + } + if(read) { + if(pd->rd <= 0 || pd->rt.fv == nil) + runtime·throw("deadlineimpl: inconsistent read deadline"); + pd->rd = -1; + pd->rt.fv = nil; + rg = netpollunblock(pd, 'r'); + } + if(write) { + if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) + runtime·throw("deadlineimpl: inconsistent write deadline"); + pd->wd = -1; + pd->wt.fv = nil; + wg = netpollunblock(pd, 'w'); + } + runtime·unlock(pd); + if(rg) + runtime·ready(rg); + if(wg) + runtime·ready(wg); +} + +static void +deadline(int64 now, Eface arg) +{ + deadlineimpl(now, arg, true, true); +} + +static void +readDeadline(int64 now, Eface arg) +{ + deadlineimpl(now, arg, true, false); +} + +static void +writeDeadline(int64 now, Eface arg) +{ + deadlineimpl(now, arg, false, true); +} + +static PollDesc* +allocPollDesc(void) +{ + PollDesc *pd; + uint32 i, n; + + runtime·lock(&pollcache); + if(pollcache.first == nil) { + n = PageSize/sizeof(*pd); + if(n == 0) + n = 1; + // Must be in non-GC memory because can be referenced + // only from epoll/kqueue internals. + pd = runtime·SysAlloc(n*sizeof(*pd)); + for(i = 0; i < n; i++) { + pd[i].link = pollcache.first; + pollcache.first = &pd[i]; + } + } + pd = pollcache.first; + pollcache.first = pd->link; + runtime·unlock(&pollcache); + return pd; +} diff --git a/src/pkg/runtime/netpoll_kqueue.c b/src/pkg/runtime/netpoll_kqueue.c new file mode 100644 index 0000000000..7603260565 --- /dev/null +++ b/src/pkg/runtime/netpoll_kqueue.c @@ -0,0 +1,96 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin + +#include "runtime.h" +#include "defs_GOOS_GOARCH.h" + +// Integrated network poller (kqueue-based implementation). + +int32 runtime·kqueue(void); +int32 runtime·kevent(int32, Kevent*, int32, Kevent*, int32, Timespec*); +void runtime·closeonexec(int32); + +static int32 kq = -1; + +void +runtime·netpollinit(void) +{ + kq = runtime·kqueue(); + if(kq < 0) { + runtime·printf("netpollinit: kqueue failed with %d\n", -kq); + runtime·throw("netpollinit: kqueue failed"); + } + runtime·closeonexec(kq); +} + +int32 +runtime·netpollopen(int32 fd, PollDesc *pd) +{ + Kevent ev[2]; + int32 n; + + // Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR) + // for the whole fd lifetime. The notifications are automatically unregistered + // when fd is closed. + ev[0].ident = fd; + ev[0].filter = EVFILT_READ; + ev[0].flags = EV_ADD|EV_RECEIPT|EV_CLEAR; + ev[0].fflags = 0; + ev[0].data = 0; + ev[0].udata = (byte*)pd; + ev[1] = ev[0]; + ev[1].filter = EVFILT_WRITE; + n = runtime·kevent(kq, ev, 2, ev, 2, nil); + if(n < 0) + return -n; + if(n != 2 || + (ev[0].flags&EV_ERROR) == 0 || ev[0].ident != fd || ev[0].filter != EVFILT_READ || + (ev[1].flags&EV_ERROR) == 0 || ev[1].ident != fd || ev[1].filter != EVFILT_WRITE) + return EFAULT; // just to mark out from other errors + if(ev[0].data != 0) + return ev[0].data; + if(ev[1].data != 0) + return ev[1].data; + return 0; +} + +// Polls for ready network connections. +// Returns list of goroutines that become runnable. +G* +runtime·netpoll(bool block) +{ + Kevent events[64], *ev; + Timespec ts, *tp; + int32 n, i; + G *gp; + + if(kq == -1) + return nil; + tp = nil; + if(!block) { + ts.tv_sec = 0; + ts.tv_nsec = 0; + tp = &ts; + } + gp = nil; +retry: + n = runtime·kevent(kq, nil, 0, events, nelem(events), tp); + if(n < 0) { + if(n != -EINTR) + runtime·printf("kqueue failed with %d\n", -n); + goto retry; + } + for(i = 0; i < n; i++) { + ev = &events[i]; + if(ev->filter == EVFILT_READ) + runtime·netpollready(&gp, (PollDesc*)ev->udata, 'r'); + if(ev->filter == EVFILT_WRITE) + runtime·netpollready(&gp, (PollDesc*)ev->udata, 'w'); + } + if(block && gp == nil) + goto retry; + return gp; +} diff --git a/src/pkg/runtime/netpoll_stub.c b/src/pkg/runtime/netpoll_stub.c index 4dba88620a..db2b1ee13f 100644 --- a/src/pkg/runtime/netpoll_stub.c +++ b/src/pkg/runtime/netpoll_stub.c @@ -2,6 +2,8 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +// +build freebsd linux netbsd openbsd plan9 windows + #include "runtime.h" // Polls for ready network connections. diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index 026c7a5375..8858922b75 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -85,6 +85,7 @@ typedef struct LFNode LFNode; typedef struct ParFor ParFor; typedef struct ParForThread ParForThread; typedef struct CgoMal CgoMal; +typedef struct PollDesc PollDesc; /* * Per-CPU declaration. @@ -786,6 +787,9 @@ extern int64 runtime·blockprofilerate; void runtime·addtimer(Timer*); bool runtime·deltimer(Timer*); G* runtime·netpoll(bool); +void runtime·netpollinit(void); +int32 runtime·netpollopen(int32, PollDesc*); +void runtime·netpollready(G**, PollDesc*, int32); #pragma varargck argpos runtime·printf 1 #pragma varargck type "d" int32 diff --git a/src/pkg/runtime/sys_darwin_386.s b/src/pkg/runtime/sys_darwin_386.s index d27abc7bae..99cd8e7611 100644 --- a/src/pkg/runtime/sys_darwin_386.s +++ b/src/pkg/runtime/sys_darwin_386.s @@ -488,3 +488,32 @@ TEXT runtime·sysctl(SB),7,$0 RET MOVL $0, AX RET + +// int32 runtime·kqueue(void); +TEXT runtime·kqueue(SB),7,$0 + MOVL $362, AX + INT $0x80 + JAE 2(PC) + NEGL AX + RET + +// int32 runtime·kevent(int kq, Kevent *changelist, int nchanges, Kevent *eventlist, int nevents, Timespec *timeout); +TEXT runtime·kevent(SB),7,$0 + MOVL $363, AX + INT $0x80 + JAE 2(PC) + NEGL AX + RET + +// int32 runtime·closeonexec(int32 fd); +TEXT runtime·closeonexec(SB),7,$32 + MOVL $92, AX // fcntl + // 0(SP) is where the caller PC would be; kernel skips it + MOVL fd+0(FP), BX + MOVL BX, 4(SP) // fd + MOVL $2, 8(SP) // F_SETFD + MOVL $1, 12(SP) // FD_CLOEXEC + INT $0x80 + JAE 2(PC) + NEGL AX + RET diff --git a/src/pkg/runtime/sys_darwin_amd64.s b/src/pkg/runtime/sys_darwin_amd64.s index b8ae01aa20..271d43a31b 100644 --- a/src/pkg/runtime/sys_darwin_amd64.s +++ b/src/pkg/runtime/sys_darwin_amd64.s @@ -439,3 +439,37 @@ TEXT runtime·sysctl(SB),7,$0 RET MOVL $0, AX RET + +// int32 runtime·kqueue(void); +TEXT runtime·kqueue(SB),7,$0 + MOVQ $0, DI + MOVQ $0, SI + MOVQ $0, DX + MOVL $(0x2000000+362), AX + SYSCALL + JCC 2(PC) + NEGL AX + RET + +// int32 runtime·kevent(int kq, Kevent *changelist, int nchanges, Kevent *eventlist, int nevents, Timespec *timeout); +TEXT runtime·kevent(SB),7,$0 + MOVL 8(SP), DI + MOVQ 16(SP), SI + MOVL 24(SP), DX + MOVQ 32(SP), R10 + MOVL 40(SP), R8 + MOVQ 48(SP), R9 + MOVL $(0x2000000+363), AX + SYSCALL + JCC 2(PC) + NEGL AX + RET + +// void runtime·closeonexec(int32 fd); +TEXT runtime·closeonexec(SB),7,$0 + MOVL 8(SP), DI // fd + MOVQ $2, SI // F_SETFD + MOVQ $1, DX // FD_CLOEXEC + MOVL $(0x2000000+92), AX // fcntl + SYSCALL + RET -- 2.48.1