]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: integrated network poller for darwin
authorDmitriy Vyukov <dvyukov@google.com>
Thu, 14 Mar 2013 06:38:37 +0000 (10:38 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Thu, 14 Mar 2013 06:38:37 +0000 (10:38 +0400)
vs tip:
benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4Persistent                 67786        33175  -51.06%
BenchmarkTCP4Persistent-2               49085        31227  -36.38%
BenchmarkTCP4PersistentTimeout          69265        32565  -52.98%
BenchmarkTCP4PersistentTimeout-2        49217        32588  -33.79%

vs old scheduler:
benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4Persistent                 63517        33175  -47.77%
BenchmarkTCP4Persistent-2               54760        31227  -42.97%
BenchmarkTCP4PersistentTimeout          63234        32565  -48.50%
BenchmarkTCP4PersistentTimeout-2        56956        32588  -42.78%

R=golang-dev, bradfitz, devon.odell, mikioh.mikioh, iant, rsc
CC=golang-dev, pabuhr
https://golang.org/cl/7569043

12 files changed:
src/pkg/net/fd_darwin.go [deleted file]
src/pkg/net/fd_poll_runtime.go [new file with mode: 0644]
src/pkg/net/fd_poll_unix.go
src/pkg/runtime/defs_darwin.go
src/pkg/runtime/defs_darwin_386.h
src/pkg/runtime/defs_darwin_amd64.h
src/pkg/runtime/netpoll.goc [new file with mode: 0644]
src/pkg/runtime/netpoll_kqueue.c [new file with mode: 0644]
src/pkg/runtime/netpoll_stub.c
src/pkg/runtime/runtime.h
src/pkg/runtime/sys_darwin_386.s
src/pkg/runtime/sys_darwin_amd64.s

diff --git a/src/pkg/net/fd_darwin.go b/src/pkg/net/fd_darwin.go
deleted file mode 100644 (file)
index 382465b..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-// Copyright 2009 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Waiting for FDs via kqueue/kevent.
-
-package net
-
-import (
-       "errors"
-       "os"
-       "syscall"
-)
-
-type pollster struct {
-       kq       int
-       eventbuf [10]syscall.Kevent_t
-       events   []syscall.Kevent_t
-
-       // An event buffer for AddFD/DelFD.
-       // Must hold pollServer lock.
-       kbuf [1]syscall.Kevent_t
-}
-
-func newpollster() (p *pollster, err error) {
-       p = new(pollster)
-       if p.kq, err = syscall.Kqueue(); err != nil {
-               return nil, os.NewSyscallError("kqueue", err)
-       }
-       syscall.CloseOnExec(p.kq)
-       p.events = p.eventbuf[0:0]
-       return p, nil
-}
-
-// First return value is whether the pollServer should be woken up.
-// This version always returns false.
-func (p *pollster) AddFD(fd int, mode int, repeat bool) (bool, error) {
-       // pollServer is locked.
-
-       var kmode int
-       if mode == 'r' {
-               kmode = syscall.EVFILT_READ
-       } else {
-               kmode = syscall.EVFILT_WRITE
-       }
-       ev := &p.kbuf[0]
-       // EV_ADD - add event to kqueue list
-       // EV_RECEIPT - generate fake EV_ERROR as result of add,
-       //      rather than waiting for real event
-       // EV_ONESHOT - delete the event the first time it triggers
-       flags := syscall.EV_ADD | syscall.EV_RECEIPT
-       if !repeat {
-               flags |= syscall.EV_ONESHOT
-       }
-       syscall.SetKevent(ev, fd, kmode, flags)
-
-       n, err := syscall.Kevent(p.kq, p.kbuf[:], p.kbuf[:], nil)
-       if err != nil {
-               return false, os.NewSyscallError("kevent", err)
-       }
-       if n != 1 || (ev.Flags&syscall.EV_ERROR) == 0 || int(ev.Ident) != fd || int(ev.Filter) != kmode {
-               return false, errors.New("kqueue phase error")
-       }
-       if ev.Data != 0 {
-               return false, syscall.Errno(ev.Data)
-       }
-       return false, nil
-}
-
-// Return value is whether the pollServer should be woken up.
-// This version always returns false.
-func (p *pollster) DelFD(fd int, mode int) bool {
-       // pollServer is locked.
-
-       var kmode int
-       if mode == 'r' {
-               kmode = syscall.EVFILT_READ
-       } else {
-               kmode = syscall.EVFILT_WRITE
-       }
-       ev := &p.kbuf[0]
-       // EV_DELETE - delete event from kqueue list
-       // EV_RECEIPT - generate fake EV_ERROR as result of add,
-       //      rather than waiting for real event
-       syscall.SetKevent(ev, fd, kmode, syscall.EV_DELETE|syscall.EV_RECEIPT)
-       syscall.Kevent(p.kq, p.kbuf[0:], p.kbuf[0:], nil)
-       return false
-}
-
-func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err error) {
-       var t *syscall.Timespec
-       for len(p.events) == 0 {
-               if nsec > 0 {
-                       if t == nil {
-                               t = new(syscall.Timespec)
-                       }
-                       *t = syscall.NsecToTimespec(nsec)
-               }
-
-               s.Unlock()
-               n, err := syscall.Kevent(p.kq, nil, p.eventbuf[:], t)
-               s.Lock()
-
-               if err != nil {
-                       if err == syscall.EINTR {
-                               continue
-                       }
-                       return -1, 0, os.NewSyscallError("kevent", nil)
-               }
-               if n == 0 {
-                       return -1, 0, nil
-               }
-               p.events = p.eventbuf[:n]
-       }
-       ev := &p.events[0]
-       p.events = p.events[1:]
-       fd = int(ev.Ident)
-       if ev.Filter == syscall.EVFILT_READ {
-               mode = 'r'
-       } else {
-               mode = 'w'
-       }
-       return fd, mode, nil
-}
-
-func (p *pollster) Close() error { return os.NewSyscallError("close", syscall.Close(p.kq)) }
diff --git a/src/pkg/net/fd_poll_runtime.go b/src/pkg/net/fd_poll_runtime.go
new file mode 100644 (file)
index 0000000..2c8b47e
--- /dev/null
@@ -0,0 +1,119 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build darwin
+
+package net
+
+import (
+       "sync"
+       "syscall"
+       "time"
+)
+
+func runtime_pollServerInit()
+func runtime_pollOpen(fd int) (uintptr, int)
+func runtime_pollClose(ctx uintptr)
+func runtime_pollWait(ctx uintptr, mode int) int
+func runtime_pollReset(ctx uintptr, mode int) int
+func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
+func runtime_pollUnblock(ctx uintptr)
+
+var canCancelIO = true // used for testing current package
+
+type pollDesc struct {
+       runtimeCtx uintptr
+}
+
+var serverInit sync.Once
+
+func sysInit() {
+}
+
+func (pd *pollDesc) Init(fd *netFD) error {
+       serverInit.Do(runtime_pollServerInit)
+       ctx, errno := runtime_pollOpen(fd.sysfd)
+       if errno != 0 {
+               return syscall.Errno(errno)
+       }
+       pd.runtimeCtx = ctx
+       return nil
+}
+
+func (pd *pollDesc) Close() {
+       runtime_pollClose(pd.runtimeCtx)
+}
+
+func (pd *pollDesc) Lock() {
+}
+
+func (pd *pollDesc) Unlock() {
+}
+
+func (pd *pollDesc) Wakeup() {
+}
+
+// Evict evicts fd from the pending list, unblocking any I/O running on fd.
+// Return value is whether the pollServer should be woken up.
+func (pd *pollDesc) Evict() bool {
+       runtime_pollUnblock(pd.runtimeCtx)
+       return false
+}
+
+func (pd *pollDesc) PrepareRead() error {
+       res := runtime_pollReset(pd.runtimeCtx, 'r')
+       return convertErr(res)
+}
+
+func (pd *pollDesc) PrepareWrite() error {
+       res := runtime_pollReset(pd.runtimeCtx, 'w')
+       return convertErr(res)
+}
+
+func (pd *pollDesc) WaitRead() error {
+       res := runtime_pollWait(pd.runtimeCtx, 'r')
+       return convertErr(res)
+}
+
+func (pd *pollDesc) WaitWrite() error {
+       res := runtime_pollWait(pd.runtimeCtx, 'w')
+       return convertErr(res)
+}
+
+func convertErr(res int) error {
+       switch res {
+       case 0:
+               return nil
+       case 1:
+               return errClosing
+       case 2:
+               return errTimeout
+       }
+       panic("unreachable")
+}
+
+func setReadDeadline(fd *netFD, t time.Time) error {
+       return setDeadlineImpl(fd, t, 'r')
+}
+
+func setWriteDeadline(fd *netFD, t time.Time) error {
+       return setDeadlineImpl(fd, t, 'w')
+}
+
+func setDeadline(fd *netFD, t time.Time) error {
+       return setDeadlineImpl(fd, t, 'r'+'w')
+}
+
+func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
+       d := t.UnixNano()
+       if t.IsZero() {
+               d = 0
+       }
+       if err := fd.incref(false); err != nil {
+               return err
+       }
+       runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
+       fd.decref()
+       return nil
+}
index 7f7f764f9c72861cca8a6a8b8c93c46aa14df6ed..c08eb0033e455ac01e41c87c74a3cd5f6a72531e 100644 (file)
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// +build darwin freebsd linux netbsd openbsd
+// +build freebsd linux netbsd openbsd
 
 package net
 
index 7f22b0b8e55dacfdb14382d16efe0213ff8bdaaf..722013ba968581f218e6d0752643fb55e0f00db3 100644 (file)
@@ -7,8 +7,8 @@
 /*
 Input to cgo.
 
-GOARCH=amd64 cgo -cdefs defs_darwin.go >defs_darwin_amd64.h
-GOARCH=386 cgo -cdefs defs_darwin.go >defs_darwin_386.h
+GOARCH=amd64 go tool cgo -cdefs defs_darwin.go >defs_darwin_amd64.h
+GOARCH=386 go tool cgo -cdefs defs_darwin.go >defs_darwin_386.h
 */
 
 package runtime
@@ -19,12 +19,17 @@ package runtime
 #include <mach/message.h>
 #include <sys/types.h>
 #include <sys/time.h>
+#include <errno.h>
 #include <signal.h>
+#include <sys/event.h>
 #include <sys/mman.h>
 */
 import "C"
 
 const (
+       EINTR  = C.EINTR
+       EFAULT = C.EFAULT
+
        PROT_NONE  = C.PROT_NONE
        PROT_READ  = C.PROT_READ
        PROT_WRITE = C.PROT_WRITE
@@ -128,6 +133,14 @@ const (
        ITIMER_REAL    = C.ITIMER_REAL
        ITIMER_VIRTUAL = C.ITIMER_VIRTUAL
        ITIMER_PROF    = C.ITIMER_PROF
+
+       EV_ADD       = C.EV_ADD
+       EV_DELETE    = C.EV_DELETE
+       EV_CLEAR     = C.EV_CLEAR
+       EV_RECEIPT   = C.EV_RECEIPT
+       EV_ERROR     = C.EV_ERROR
+       EVFILT_READ  = C.EVFILT_READ
+       EVFILT_WRITE = C.EVFILT_WRITE
 )
 
 type MachBody C.mach_msg_body_t
@@ -144,6 +157,7 @@ type Sigval C.union_sigval
 type Siginfo C.siginfo_t
 type Timeval C.struct_timeval
 type Itimerval C.struct_itimerval
+type Timespec C.struct_timespec
 
 type FPControl C.struct_fp_control
 type FPStatus C.struct_fp_status
@@ -161,3 +175,5 @@ type ExceptionState32 C.struct_i386_exception_state
 type Mcontext32 C.struct_mcontext32
 
 type Ucontext C.struct_ucontext
+
+type Kevent C.struct_kevent
index 92732f460246e423c9683351f2942228c5e78fc7..7b210eebf8f0d7854cdfe27fb3f4d49090218f87 100644 (file)
@@ -3,6 +3,9 @@
 
 
 enum {
+       EINTR   = 0x4,
+       EFAULT  = 0xe,
+
        PROT_NONE       = 0x0,
        PROT_READ       = 0x1,
        PROT_WRITE      = 0x2,
@@ -106,6 +109,14 @@ enum {
        ITIMER_REAL     = 0x0,
        ITIMER_VIRTUAL  = 0x1,
        ITIMER_PROF     = 0x2,
+
+       EV_ADD          = 0x1,
+       EV_DELETE       = 0x2,
+       EV_CLEAR        = 0x20,
+       EV_RECEIPT      = 0x40,
+       EV_ERROR        = 0x4000,
+       EVFILT_READ     = -0x1,
+       EVFILT_WRITE    = -0x2,
 };
 
 typedef struct MachBody MachBody;
@@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction;
 typedef struct Siginfo Siginfo;
 typedef struct Timeval Timeval;
 typedef struct Itimerval Itimerval;
+typedef struct Timespec Timespec;
 typedef struct FPControl FPControl;
 typedef struct FPStatus FPStatus;
 typedef struct RegMMST RegMMST;
@@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32;
 typedef struct ExceptionState32 ExceptionState32;
 typedef struct Mcontext32 Mcontext32;
 typedef struct Ucontext Ucontext;
+typedef struct Kevent Kevent;
 
 #pragma pack on
 
@@ -170,7 +183,7 @@ struct StackT {
 typedef        byte    Sighandler[4];
 
 struct Sigaction {
-       Sighandler      __sigaction_u;
+       byte    __sigaction_u[4];
        void    *sa_tramp;
        uint32  sa_mask;
        int32   sa_flags;
@@ -185,7 +198,7 @@ struct Siginfo {
        uint32  si_uid;
        int32   si_status;
        byte    *si_addr;
-       Sigval  si_value;
+       byte    si_value[4];
        int32   si_band;
        uint32  __pad[7];
 };
@@ -197,6 +210,10 @@ struct Itimerval {
        Timeval it_interval;
        Timeval it_value;
 };
+struct Timespec {
+       int32   tv_sec;
+       int32   tv_nsec;
+};
 
 struct FPControl {
        byte    Pad_cgo_0[2];
@@ -362,5 +379,14 @@ struct Ucontext {
        Mcontext32      *uc_mcontext;
 };
 
+struct Kevent {
+       uint32  ident;
+       int16   filter;
+       uint16  flags;
+       uint32  fflags;
+       int32   data;
+       byte    *udata;
+};
+
 
 #pragma pack off
index d4fbfef4995001a1451d6cd5d7ece883631d6c80..2d464a9e50607a0668c83d67c7855dfc335b3b1c 100644 (file)
@@ -3,6 +3,9 @@
 
 
 enum {
+       EINTR   = 0x4,
+       EFAULT  = 0xe,
+
        PROT_NONE       = 0x0,
        PROT_READ       = 0x1,
        PROT_WRITE      = 0x2,
@@ -106,6 +109,14 @@ enum {
        ITIMER_REAL     = 0x0,
        ITIMER_VIRTUAL  = 0x1,
        ITIMER_PROF     = 0x2,
+
+       EV_ADD          = 0x1,
+       EV_DELETE       = 0x2,
+       EV_CLEAR        = 0x20,
+       EV_RECEIPT      = 0x40,
+       EV_ERROR        = 0x4000,
+       EVFILT_READ     = -0x1,
+       EVFILT_WRITE    = -0x2,
 };
 
 typedef struct MachBody MachBody;
@@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction;
 typedef struct Siginfo Siginfo;
 typedef struct Timeval Timeval;
 typedef struct Itimerval Itimerval;
+typedef struct Timespec Timespec;
 typedef struct FPControl FPControl;
 typedef struct FPStatus FPStatus;
 typedef struct RegMMST RegMMST;
@@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32;
 typedef struct ExceptionState32 ExceptionState32;
 typedef struct Mcontext32 Mcontext32;
 typedef struct Ucontext Ucontext;
+typedef struct Kevent Kevent;
 
 #pragma pack on
 
@@ -171,7 +184,7 @@ struct StackT {
 typedef        byte    Sighandler[8];
 
 struct Sigaction {
-       Sighandler      __sigaction_u;
+       byte    __sigaction_u[8];
        void    *sa_tramp;
        uint32  sa_mask;
        int32   sa_flags;
@@ -186,7 +199,7 @@ struct Siginfo {
        uint32  si_uid;
        int32   si_status;
        byte    *si_addr;
-       Sigval  si_value;
+       byte    si_value[8];
        int64   si_band;
        uint64  __pad[7];
 };
@@ -199,6 +212,10 @@ struct Itimerval {
        Timeval it_interval;
        Timeval it_value;
 };
+struct Timespec {
+       int64   tv_sec;
+       int64   tv_nsec;
+};
 
 struct FPControl {
        byte    Pad_cgo_0[2];
@@ -365,5 +382,14 @@ struct Ucontext {
        Mcontext64      *uc_mcontext;
 };
 
+struct Kevent {
+       uint64  ident;
+       int16   filter;
+       uint16  flags;
+       uint32  fflags;
+       int64   data;
+       byte    *udata;
+};
+
 
 #pragma pack off
diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc
new file mode 100644 (file)
index 0000000..2c61952
--- /dev/null
@@ -0,0 +1,346 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+#include "runtime.h"
+#include "defs_GOOS_GOARCH.h"
+#include "arch_GOARCH.h"
+#include "malloc.h"
+
+// Integrated network poller (platform-independent part).
+// A particular implementation (epoll/kqueue) must define the following functions:
+// void runtime·netpollinit(void);                    // to initialize the poller
+// int32 runtime·netpollopen(int32 fd, PollDesc *pd); // to arm edge-triggered notifications
+                                                       // and associate fd with pd.
+// An implementation must call the following function to denote that the pd is ready.
+// void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);
+
+#define READY ((G*)1)
+
+struct PollDesc
+{
+       PollDesc* link; // in pollcache, protected by pollcache.Lock
+       Lock;           // protectes the following fields
+       bool    closing;
+       uintptr seq;    // protects from stale timers and ready notifications
+       G*      rg;     // G waiting for read or READY (binary semaphore)
+       Timer   rt;     // read deadline timer (set if rt.fv != nil)
+       int64   rd;     // read deadline
+       G*      wg;     // the same for writes
+       Timer   wt;
+       int64   wd;
+};
+
+static struct
+{
+       Lock;
+       PollDesc*       first;
+       // PollDesc objects must be type-stable,
+       // because we can get ready notification from epoll/kqueue
+       // after the descriptor is closed/reused.
+       // Stale notifications are detected using seq variable,
+       // seq is incremented when deadlines are changed or descriptor is reused.
+} pollcache;
+
+static void    netpollblock(PollDesc*, int32);
+static G*      netpollunblock(PollDesc*, int32);
+static void    deadline(int64, Eface);
+static void    readDeadline(int64, Eface);
+static void    writeDeadline(int64, Eface);
+static PollDesc*       allocPollDesc(void);
+static intgo   checkerr(PollDesc *pd, int32 mode);
+
+static FuncVal deadlineFn      = {(void(*)(void))deadline};
+static FuncVal readDeadlineFn  = {(void(*)(void))readDeadline};
+static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline};
+
+func runtime_pollServerInit() {
+       runtime·netpollinit();
+}
+
+func runtime_pollOpen(fd int) (pd *PollDesc, errno int) {
+       pd = allocPollDesc();
+       runtime·lock(pd);
+       if(pd->wg != nil && pd->wg != READY)
+               runtime·throw("runtime_pollOpen: blocked write on free descriptor");
+       if(pd->rg != nil && pd->rg != READY)
+               runtime·throw("runtime_pollOpen: blocked read on free descriptor");
+       pd->closing = false;
+       pd->seq++;
+       pd->rg = nil;
+       pd->rd = 0;
+       pd->wg = nil;
+       pd->wd = 0;
+       runtime·unlock(pd);
+
+       errno = runtime·netpollopen(fd, pd);
+}
+
+func runtime_pollClose(pd *PollDesc) {
+       if(!pd->closing)
+               runtime·throw("runtime_pollClose: close w/o unblock");
+       if(pd->wg != nil && pd->wg != READY)
+               runtime·throw("runtime_pollClose: blocked write on closing descriptor");
+       if(pd->rg != nil && pd->rg != READY)
+               runtime·throw("runtime_pollClose: blocked read on closing descriptor");
+       runtime·lock(&pollcache);
+       pd->link = pollcache.first;
+       pollcache.first = pd;
+       runtime·unlock(&pollcache);
+}
+
+func runtime_pollReset(pd *PollDesc, mode int) (err int) {
+       runtime·lock(pd);
+       err = checkerr(pd, mode);
+       if(err)
+               goto ret;
+       if(mode == 'r')
+               pd->rg = nil;
+       else if(mode == 'w')
+               pd->wg = nil;
+ret:
+       runtime·unlock(pd);
+}
+
+func runtime_pollWait(pd *PollDesc, mode int) (err int) {
+       runtime·lock(pd);
+       err = checkerr(pd, mode);
+       if(err)
+               goto ret;
+       netpollblock(pd, mode);
+       err = checkerr(pd, mode);
+ret:
+       runtime·unlock(pd);
+}
+
+func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
+       runtime·lock(pd);
+       if(pd->closing)
+               goto ret;
+       pd->seq++;  // invalidate current timers
+       // Reset current timers.
+       if(pd->rt.fv) {
+               runtime·deltimer(&pd->rt);
+               pd->rt.fv = nil;
+       }
+       if(pd->wt.fv) {
+               runtime·deltimer(&pd->wt);
+               pd->wt.fv = nil;
+       }
+       // Setup new timers.
+       if(d != 0 && d <= runtime·nanotime()) {
+               d = -1;
+       }
+       if(mode == 'r' || mode == 'r'+'w')
+               pd->rd = d;
+       if(mode == 'w' || mode == 'r'+'w')
+               pd->wd = d;
+       if(pd->rd > 0 && pd->rd == pd->wd) {
+               pd->rt.fv = &deadlineFn;
+               pd->rt.when = pd->rd;
+               // Copy current seq into the timer arg.
+               // Timer func will check the seq against current descriptor seq,
+               // if they differ the descriptor was reused or timers were reset.
+               pd->rt.arg.type = (Type*)pd->seq;
+               pd->rt.arg.data = pd;
+               runtime·addtimer(&pd->rt);
+       } else {
+               if(pd->rd > 0) {
+                       pd->rt.fv = &readDeadlineFn;
+                       pd->rt.when = pd->rd;
+                       pd->rt.arg.type = (Type*)pd->seq;
+                       pd->rt.arg.data = pd;
+                       runtime·addtimer(&pd->rt);
+               }
+               if(pd->wd > 0) {
+                       pd->wt.fv = &writeDeadlineFn;
+                       pd->wt.when = pd->wd;
+                       pd->wt.arg.type = (Type*)pd->seq;
+                       pd->wt.arg.data = pd;
+                       runtime·addtimer(&pd->wt);
+               }
+       }
+ret:
+       runtime·unlock(pd);
+}
+
+func runtime_pollUnblock(pd *PollDesc) {
+       G *rg, *wg;
+
+       runtime·lock(pd);
+       if(pd->closing)
+               runtime·throw("runtime_pollUnblock: already closing");
+       pd->closing = true;
+       pd->seq++;
+       rg = netpollunblock(pd, 'r');
+       wg = netpollunblock(pd, 'w');
+       if(pd->rt.fv) {
+               runtime·deltimer(&pd->rt);
+               pd->rt.fv = nil;
+       }
+       if(pd->wt.fv) {
+               runtime·deltimer(&pd->wt);
+               pd->wt.fv = nil;
+       }
+       runtime·unlock(pd);
+       if(rg)
+               runtime·ready(rg);
+       if(wg)
+               runtime·ready(wg);
+}
+
+// make pd ready, newly runnable goroutines (if any) are enqueued info gpp list
+void
+runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
+{
+       G *rg, *wg;
+
+       rg = wg = nil;
+       runtime·lock(pd);
+       if(mode == 'r' || mode == 'r'+'w')
+               rg = netpollunblock(pd, 'r');
+       if(mode == 'w' || mode == 'r'+'w')
+               wg = netpollunblock(pd, 'w');
+       runtime·unlock(pd);
+       if(rg) {
+               rg->schedlink = *gpp;
+               *gpp = rg;
+       }
+       if(wg) {
+               wg->schedlink = *gpp;
+               *gpp = wg;
+       }
+}
+
+static intgo
+checkerr(PollDesc *pd, int32 mode)
+{
+       if(pd->closing)
+               return 1;  // errClosing
+       if((mode == 'r' && pd->rd < 0) || (mode == 'w' && pd->wd < 0))
+               return 2;  // errTimeout
+       return 0;
+}
+
+static void
+netpollblock(PollDesc *pd, int32 mode)
+{
+       G **gpp;
+
+       gpp = &pd->rg;
+       if(mode == 'w')
+               gpp = &pd->wg;
+       if(*gpp == READY) {
+               *gpp = nil;
+               return;
+       }
+       if(*gpp != nil)
+               runtime·throw("epoll: double wait");
+       *gpp = g;
+       runtime·park(runtime·unlock, &pd->Lock, "IO wait");
+       runtime·lock(pd);
+}
+
+static G*
+netpollunblock(PollDesc *pd, int32 mode)
+{
+       G **gpp, *old;
+
+       gpp = &pd->rg;
+       if(mode == 'w')
+               gpp = &pd->wg;
+       if(*gpp == READY)
+               return nil;
+       if(*gpp == nil) {
+               *gpp = READY;
+               return nil;
+       }
+       old = *gpp;
+       *gpp = nil;
+       return old;
+}
+
+static void
+deadlineimpl(int64 now, Eface arg, bool read, bool write)
+{
+       PollDesc *pd;
+       uint32 seq;
+       G *rg, *wg;
+
+       USED(now);
+       pd = (PollDesc*)arg.data;
+       // This is the seq when the timer was set.
+       // If it's stale, ignore the timer event.
+       seq = (uintptr)arg.type;
+       rg = wg = nil;
+       runtime·lock(pd);
+       if(seq != pd->seq) {
+               // The descriptor was reused or timers were reset.
+               runtime·unlock(pd);
+               return;
+       }
+       if(read) {
+               if(pd->rd <= 0 || pd->rt.fv == nil)
+                       runtime·throw("deadlineimpl: inconsistent read deadline");
+               pd->rd = -1;
+               pd->rt.fv = nil;
+               rg = netpollunblock(pd, 'r');
+       }
+       if(write) {
+               if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
+                       runtime·throw("deadlineimpl: inconsistent write deadline");
+               pd->wd = -1;
+               pd->wt.fv = nil;
+               wg = netpollunblock(pd, 'w');
+       }
+       runtime·unlock(pd);
+       if(rg)
+               runtime·ready(rg);
+       if(wg)
+               runtime·ready(wg);
+}
+
+static void
+deadline(int64 now, Eface arg)
+{
+       deadlineimpl(now, arg, true, true);
+}
+
+static void
+readDeadline(int64 now, Eface arg)
+{
+       deadlineimpl(now, arg, true, false);
+}
+
+static void
+writeDeadline(int64 now, Eface arg)
+{
+       deadlineimpl(now, arg, false, true);
+}
+
+static PollDesc*
+allocPollDesc(void)
+{
+       PollDesc *pd;
+       uint32 i, n;
+
+       runtime·lock(&pollcache);
+       if(pollcache.first == nil) {
+               n = PageSize/sizeof(*pd);
+               if(n == 0)
+                       n = 1;
+               // Must be in non-GC memory because can be referenced
+               // only from epoll/kqueue internals.
+               pd = runtime·SysAlloc(n*sizeof(*pd));
+               for(i = 0; i < n; i++) {
+                       pd[i].link = pollcache.first;
+                       pollcache.first = &pd[i];
+               }
+       }
+       pd = pollcache.first;
+       pollcache.first = pd->link;
+       runtime·unlock(&pollcache);
+       return pd;
+}
diff --git a/src/pkg/runtime/netpoll_kqueue.c b/src/pkg/runtime/netpoll_kqueue.c
new file mode 100644 (file)
index 0000000..7603260
--- /dev/null
@@ -0,0 +1,96 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build darwin
+
+#include "runtime.h"
+#include "defs_GOOS_GOARCH.h"
+
+// Integrated network poller (kqueue-based implementation).
+
+int32  runtime·kqueue(void);
+int32  runtime·kevent(int32, Kevent*, int32, Kevent*, int32, Timespec*);
+void   runtime·closeonexec(int32);
+
+static int32 kq = -1;
+
+void
+runtime·netpollinit(void)
+{
+       kq = runtime·kqueue();
+       if(kq < 0) {
+               runtime·printf("netpollinit: kqueue failed with %d\n", -kq);
+               runtime·throw("netpollinit: kqueue failed");
+       }
+       runtime·closeonexec(kq);
+}
+
+int32
+runtime·netpollopen(int32 fd, PollDesc *pd)
+{
+       Kevent ev[2];
+       int32 n;
+
+       // Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
+       // for the whole fd lifetime.  The notifications are automatically unregistered
+       // when fd is closed.
+       ev[0].ident = fd;
+       ev[0].filter = EVFILT_READ;
+       ev[0].flags = EV_ADD|EV_RECEIPT|EV_CLEAR;
+       ev[0].fflags = 0;
+       ev[0].data = 0;
+       ev[0].udata = (byte*)pd;
+       ev[1] = ev[0];
+       ev[1].filter = EVFILT_WRITE;
+       n = runtime·kevent(kq, ev, 2, ev, 2, nil);
+       if(n < 0)
+               return -n;
+       if(n != 2 ||
+               (ev[0].flags&EV_ERROR) == 0 || ev[0].ident != fd || ev[0].filter != EVFILT_READ ||
+               (ev[1].flags&EV_ERROR) == 0 || ev[1].ident != fd || ev[1].filter != EVFILT_WRITE)
+               return EFAULT;  // just to mark out from other errors
+       if(ev[0].data != 0)
+               return ev[0].data;
+       if(ev[1].data != 0)
+               return ev[1].data;
+       return 0;
+}
+
+// Polls for ready network connections.
+// Returns list of goroutines that become runnable.
+G*
+runtime·netpoll(bool block)
+{
+       Kevent events[64], *ev;
+       Timespec ts, *tp;
+       int32 n, i;
+       G *gp;
+
+       if(kq == -1)
+               return nil;
+       tp = nil;
+       if(!block) {
+               ts.tv_sec = 0;
+               ts.tv_nsec = 0;
+               tp = &ts;
+       }
+       gp = nil;
+retry:
+       n = runtime·kevent(kq, nil, 0, events, nelem(events), tp);
+       if(n < 0) {
+               if(n != -EINTR)
+                       runtime·printf("kqueue failed with %d\n", -n);
+               goto retry;
+       }
+       for(i = 0; i < n; i++) {
+               ev = &events[i];
+               if(ev->filter == EVFILT_READ)
+                       runtime·netpollready(&gp, (PollDesc*)ev->udata, 'r');
+               if(ev->filter == EVFILT_WRITE)
+                       runtime·netpollready(&gp, (PollDesc*)ev->udata, 'w');
+       }
+       if(block && gp == nil)
+               goto retry;
+       return gp;
+}
index 4dba88620ac590bb850b3520b1dde30862fcf981..db2b1ee13fcb2f47ae125c706fae83246ef3edbe 100644 (file)
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
+// +build freebsd linux netbsd openbsd plan9 windows
+
 #include "runtime.h"
 
 // Polls for ready network connections.
index 026c7a537567db1d2657e57bead0fdf226e0a4a6..8858922b7510bb33052f09e95cc89cfb851c0ee0 100644 (file)
@@ -85,6 +85,7 @@ typedef       struct  LFNode          LFNode;
 typedef        struct  ParFor          ParFor;
 typedef        struct  ParForThread    ParForThread;
 typedef        struct  CgoMal          CgoMal;
+typedef        struct  PollDesc        PollDesc;
 
 /*
  * Per-CPU declaration.
@@ -786,6 +787,9 @@ extern int64 runtime·blockprofilerate;
 void   runtime·addtimer(Timer*);
 bool   runtime·deltimer(Timer*);
 G*     runtime·netpoll(bool);
+void   runtime·netpollinit(void);
+int32  runtime·netpollopen(int32, PollDesc*);
+void   runtime·netpollready(G**, PollDesc*, int32);
 
 #pragma        varargck        argpos  runtime·printf 1
 #pragma        varargck        type    "d"     int32
index d27abc7bae24b13a58debe374724de5338105ce4..99cd8e76110f29d2b170e75f9d720892966f77f4 100644 (file)
@@ -488,3 +488,32 @@ TEXT runtime·sysctl(SB),7,$0
        RET
        MOVL    $0, AX
        RET
+
+// int32 runtime·kqueue(void);
+TEXT runtime·kqueue(SB),7,$0
+       MOVL    $362, AX
+       INT     $0x80
+       JAE     2(PC)
+       NEGL    AX
+       RET
+
+// int32 runtime·kevent(int kq, Kevent *changelist, int nchanges, Kevent *eventlist, int nevents, Timespec *timeout);
+TEXT runtime·kevent(SB),7,$0
+       MOVL    $363, AX
+       INT     $0x80
+       JAE     2(PC)
+       NEGL    AX
+       RET
+
+// int32 runtime·closeonexec(int32 fd);
+TEXT runtime·closeonexec(SB),7,$32
+       MOVL    $92, AX  // fcntl
+       // 0(SP) is where the caller PC would be; kernel skips it
+       MOVL    fd+0(FP), BX
+       MOVL    BX, 4(SP)  // fd
+       MOVL    $2, 8(SP)  // F_SETFD
+       MOVL    $1, 12(SP)  // FD_CLOEXEC
+       INT     $0x80
+       JAE     2(PC)
+       NEGL    AX
+       RET
index b8ae01aa20d4fb86c8c9ccdd07b5a56df33c70a4..271d43a31befdce58f8f70bff33d0b4cdc5616bc 100644 (file)
@@ -439,3 +439,37 @@ TEXT runtime·sysctl(SB),7,$0
        RET
        MOVL    $0, AX
        RET
+
+// int32 runtime·kqueue(void);
+TEXT runtime·kqueue(SB),7,$0
+       MOVQ    $0, DI
+       MOVQ    $0, SI
+       MOVQ    $0, DX
+       MOVL    $(0x2000000+362), AX
+       SYSCALL
+       JCC     2(PC)
+       NEGL    AX
+       RET
+
+// int32 runtime·kevent(int kq, Kevent *changelist, int nchanges, Kevent *eventlist, int nevents, Timespec *timeout);
+TEXT runtime·kevent(SB),7,$0
+       MOVL    8(SP), DI
+       MOVQ    16(SP), SI
+       MOVL    24(SP), DX
+       MOVQ    32(SP), R10
+       MOVL    40(SP), R8
+       MOVQ    48(SP), R9
+       MOVL    $(0x2000000+363), AX
+       SYSCALL
+       JCC     2(PC)
+       NEGL    AX
+       RET
+
+// void runtime·closeonexec(int32 fd);
+TEXT runtime·closeonexec(SB),7,$0
+       MOVL    8(SP), DI  // fd
+       MOVQ    $2, SI  // F_SETFD
+       MOVQ    $1, DX  // FD_CLOEXEC
+       MOVL    $(0x2000000+92), AX  // fcntl
+       SYSCALL
+       RET