]> Cypherpunks repositories - gostls13.git/commitdiff
* implement Linux epoll for polling i/o
authorRuss Cox <rsc@golang.org>
Mon, 29 Sep 2008 20:37:00 +0000 (13:37 -0700)
committerRuss Cox <rsc@golang.org>
Mon, 29 Sep 2008 20:37:00 +0000 (13:37 -0700)
* isolate OS-specific polling goop in Pollster type
* move generic poll loop out of fd_darwin.go into fd.go

R=r
DELTA=782  (448 added, 281 deleted, 53 changed)
OCL=16108
CL=16119

src/lib/net/Makefile
src/lib/net/fd.go [new file with mode: 0644]
src/lib/net/fd_darwin.go
src/lib/net/fd_linux.go
src/lib/net/net_linux.go
src/lib/syscall/cast_amd64.s
src/lib/syscall/file_linux.go
src/lib/syscall/socket_linux.go
src/lib/syscall/types_amd64_linux.go

index 568377ddd651a86d673dd5eaf77ccbf1b4fe667c..afbfd1ed16f5236f6c22569167b13caa7ff77578 100644 (file)
@@ -3,7 +3,7 @@
 # license that can be found in the LICENSE file.
 
 # DO NOT EDIT.  Automatically generated by gobuild.
-# gobuild -m net fd_darwin.go net.go net_darwin.go ip.go
+# gobuild -m net fd_darwin.go fd.go net.go net_darwin.go ip.go
 O=6
 GC=$(O)g
 CC=$(O)c -w
@@ -38,9 +38,12 @@ O2=\
        net_$(GOOS).$O\
 
 O3=\
+       fd.$O\
+
+O4=\
        net.$O\
 
-$(PKG): a1 a2 a3
+$(PKG): a1 a2 a3 a4
 a1:    $(O1)
        $(AR) grc $(PKG) $(O1)
        rm -f $(O1)
@@ -50,8 +53,12 @@ a2:  $(O2)
 a3:    $(O3)
        $(AR) grc $(PKG) $(O3)
        rm -f $(O3)
+a4:    $(O4)
+       $(AR) grc $(PKG) $(O4)
+       rm -f $(O4)
 
 $(O1): nuke
 $(O2): a1
 $(O3): a2
+$(O4): a3
 
diff --git a/src/lib/net/fd.go b/src/lib/net/fd.go
new file mode 100644 (file)
index 0000000..a93dac0
--- /dev/null
@@ -0,0 +1,287 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// TODO(rsc): All the prints in this file should go to standard error.
+
+package net
+
+import (
+       "net";
+       "once";
+       "os";
+       "syscall";
+)
+
+// Network file descriptor.  Only intended to be used internally,
+// but have to export to make it available in other files implementing package net.
+export type FD struct {
+       // immutable until Close
+       fd int64;
+       osfd *os.FD;
+       cr *chan *FD;
+       cw *chan *FD;
+
+       // owned by fd wait server
+       ncr, ncw int;
+}
+
+// Make reads and writes on fd return EAGAIN instead of blocking.
+func SetNonblock(fd int64) *os.Error {
+       flags, e := syscall.fcntl(fd, syscall.F_GETFL, 0)
+       if e != 0 {
+               return os.ErrnoToError(e)
+       }
+       flags, e = syscall.fcntl(fd, syscall.F_SETFL, flags | syscall.O_NONBLOCK)
+       if e != 0 {
+               return os.ErrnoToError(e)
+       }
+       return nil
+}
+
+
+// A PollServer helps FDs determine when to retry a non-blocking
+// read or write after they get EAGAIN.  When an FD needs to wait,
+// send the fd on s.cr (for a read) or s.cw (for a write) to pass the
+// request to the poll server.  Then receive on fd.cr/fd.cw.
+// When the PollServer finds that i/o on FD should be possible
+// again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
+// This protocol is implemented as s.WaitRead() and s.WaitWrite().
+//
+// There is one subtlety: when sending on s.cr/s.cw, the
+// poll server is probably in a system call, waiting for an fd
+// to become ready.  It's not looking at the request channels.
+// To resolve this, the poll server waits not just on the FDs it has
+// been given but also its own pipe.  After sending on the
+// buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
+// byte to the pipe, causing the PollServer's poll system call to
+// return.  In response to the pipe being readable, the PollServer
+// re-polls its request channels.
+//
+// Note that the ordering is "send request" and then "wake up server".
+// If the operations were reversed, there would be a race: the poll
+// server might wake up and look at the request channel, see that it
+// was empty, and go back to sleep, all before the requester managed
+// to send the request.  Because the send must complete before the wakeup,
+// the request channel must be buffered.  A buffer of size 1 is sufficient
+// for any request load.  If many processes are trying to submit requests,
+// one will succeed, the PollServer will read the request, and then the
+// channel will be empty for the next process's request.  A larger buffer
+// might help batch requests.
+
+type PollServer struct {
+       cr, cw *chan *FD;       // buffered >= 1
+       pr, pw *os.FD;
+       pending *map[int64] *FD;
+       poll *Pollster; // low-level OS hooks
+}
+func (s *PollServer) Run();
+
+func NewPollServer() (s *PollServer, err *os.Error) {
+       s = new(PollServer);
+       s.cr = new(chan *FD, 1);
+       s.cw = new(chan *FD, 1);
+       if s.pr, s.pw, err = os.Pipe(); err != nil {
+               return nil, err
+       }
+       if err = SetNonblock(s.pr.fd); err != nil {
+       Error:
+               s.pr.Close();
+               s.pw.Close()
+               return nil, err
+       }
+       if err = SetNonblock(s.pw.fd); err != nil {
+               goto Error
+       }
+       if s.poll, err = NewPollster(); err != nil {
+               goto Error
+       }
+       if err = s.poll.AddFD(s.pr.fd, 'r', true); err != nil {
+               s.poll.Close()
+               goto Error
+       }
+       s.pending = new(map[int64] *FD)
+       go s.Run()
+       return s, nil
+}
+
+func (s *PollServer) AddFD(fd *FD, mode int) {
+       if err := s.poll.AddFD(fd.fd, mode, false); err != nil {
+               print("PollServer AddFD: ", err.String(), "\n")
+               return
+       }
+
+       key := fd.fd << 1
+       if mode == 'r' {
+               fd.ncr++
+       } else {
+               fd.ncw++
+               key++
+       }
+       s.pending[key] = fd
+}
+
+func (s *PollServer) LookupFD(fd int64, mode int) *FD {
+       key := fd << 1
+       if mode == 'w' {
+               key++
+       }
+       netfd, ok := s.pending[key]
+       if !ok {
+               return nil
+       }
+       s.pending[key] = nil, false
+       return netfd
+}
+
+func (s *PollServer) Run() {
+       var scratch [100]byte;
+       for {
+               fd, mode, err := s.poll.WaitFD();
+               if err != nil {
+                       print("PollServer WaitFD: ", err.String(), "\n")
+                       return
+               }
+               if fd == s.pr.fd {
+                       // Drain our wakeup pipe.
+                       for nn, e := s.pr.Read(&scratch); nn > 0; {
+                               nn, e = s.pr.Read(&scratch)
+                       }
+
+                       // Read from channels
+                       for fd, ok := <-s.cr; ok; fd, ok = <-s.cr {
+                               s.AddFD(fd, 'r')
+                       }
+                       for fd, ok := <-s.cw; ok; fd, ok = <-s.cw {
+                               s.AddFD(fd, 'w')
+                       }
+               } else {
+                       netfd := s.LookupFD(fd, mode)
+                       if netfd == nil {
+                               print("PollServer: unexpected wakeup for fd=", netfd, " mode=", string(mode), "\n")
+                               continue
+                       }
+                       if mode == 'r' {
+                               for netfd.ncr > 0 {
+                                       netfd.ncr--
+                                       netfd.cr <- netfd
+                               }
+                       } else {
+                               for netfd.ncw > 0 {
+                                       netfd.ncw--
+                                       netfd.cw <- netfd
+                               }
+                       }
+               }
+       }
+}
+
+func (s *PollServer) Wakeup() {
+       var b [1]byte;
+       s.pw.Write(&b)
+}
+
+func (s *PollServer) WaitRead(fd *FD) {
+       s.cr <- fd;
+       s.Wakeup();
+       <-fd.cr
+}
+
+func (s *PollServer) WaitWrite(fd *FD) {
+       s.cr <- fd;
+       s.Wakeup();
+       <-fd.cr
+}
+
+
+// Network FD methods.
+// All the network FDs use a single PollServer.
+
+var pollserver *PollServer
+
+func StartServer() {
+       p, err := NewPollServer()
+       if err != nil {
+               print("Start PollServer: ", err.String(), "\n")
+       }
+       pollserver = p
+}
+
+export func NewFD(fd int64) (f *FD, err *os.Error) {
+       if pollserver == nil {
+               once.Do(&StartServer);
+       }
+       if err = SetNonblock(fd); err != nil {
+               return nil, err
+       }
+       f = new(FD);
+       f.fd = fd;
+       f.osfd = os.NewFD(fd);
+       f.cr = new(chan *FD, 1);
+       f.cw = new(chan *FD, 1);
+       return f, nil
+}
+
+func (fd *FD) Close() *os.Error {
+       if fd == nil || fd.osfd == nil {
+               return os.EINVAL
+       }
+       e := fd.osfd.Close();
+       fd.osfd = nil;
+       fd.fd = -1
+       return e
+}
+
+func (fd *FD) Read(p *[]byte) (n int, err *os.Error) {
+       if fd == nil || fd.osfd == nil {
+               return -1, os.EINVAL
+       }
+       n, err = fd.osfd.Read(p)
+       for err == os.EAGAIN {
+               pollserver.WaitRead(fd);
+               n, err = fd.osfd.Read(p)
+       }
+       return n, err
+}
+
+func (fd *FD) Write(p *[]byte) (n int, err *os.Error) {
+       if fd == nil || fd.osfd == nil {
+               return -1, os.EINVAL
+       }
+       err = nil;
+       nn := 0
+       for nn < len(p) && err == nil {
+               n, err = fd.osfd.Write(p[nn:len(p)]);
+               for err == os.EAGAIN {
+                       pollserver.WaitWrite(fd);
+                       n, err = fd.osfd.Write(p[nn:len(p)])
+               }
+               if n > 0 {
+                       nn += n
+               }
+               if n == 0 {
+                       break
+               }
+       }
+       return nn, err
+}
+
+func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) {
+       if fd == nil || fd.osfd == nil {
+               return nil, os.EINVAL
+       }
+       s, e := syscall.accept(fd.fd, sa)
+       for e == syscall.EAGAIN {
+               pollserver.WaitRead(fd);
+               s, e = syscall.accept(fd.fd, sa)
+       }
+       if e != 0 {
+               return nil, os.ErrnoToError(e)
+       }
+       if nfd, err = NewFD(s); err != nil {
+               syscall.close(s)
+               return nil, err
+       }
+       return nfd, nil
+}
+
index 23d085cccb71fc07a771eb140c7accce2d9057ab..16f0d4e025f8ef64027a9f53a12295eba315099a 100644 (file)
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// TODO(rsc): All the prints in this file should go to standard error.
+// Waiting for FDs via kqueue/kevent.
 
 package net
 
 import (
        "net";
-       "once";
        "os";
        "syscall";
 )
 
-const Debug = false
-
-// Network file descriptor.  Only intended to be used internally,
-// but have to export to make it available in other files implementing package net.
-export type FD struct {
-       fd int64;
-       cr *chan *FD;
-       cw *chan *FD;
-
-       // owned by fd wait server
-       ncr, ncw int;
-       next *FD;
-}
-
-func WaitRead(fd *FD);
-func WaitWrite(fd *FD);
-func StartServer();
-
-func MakeNonblocking(fd int64) *os.Error {
-       if Debug { print("MakeNonBlocking ", fd, "\n") }
-       flags, e := syscall.fcntl(fd, syscall.F_GETFL, 0)
-       if e != 0 {
-               return os.ErrnoToError(e)
-       }
-       flags, e = syscall.fcntl(fd, syscall.F_SETFL, flags | syscall.O_NONBLOCK)
-       if e != 0 {
-               return os.ErrnoToError(e)
-       }
-       return nil
-}
-
-export func NewFD(fd int64) (f *FD, err *os.Error) {
-       once.Do(&StartServer);
-       if err = MakeNonblocking(fd); err != nil {
-               return nil, err
-       }
-       f = new(FD);
-       f.fd = fd;
-       f.cr = new(chan *FD);
-       f.cw = new(chan *FD);
-       return f, nil
-}
-
-func (fd *FD) Close() *os.Error {
-       if fd == nil {
-               return os.EINVAL
-       }
-       r1, e := syscall.close(fd.fd);
-       if e != 0 {
-               return os.ErrnoToError(e)
-       }
-       return nil
-}
-
-func (fd *FD) Read(p *[]byte) (n int, err *os.Error) {
-       if fd == nil {
-               return -1, os.EINVAL
-       }
-L:     nn, e := syscall.read(fd.fd, &p[0], int64(len(p)))
-       switch {
-       case e == syscall.EAGAIN:
-               WaitRead(fd)
-               goto L
-       case e != 0:
-               return -1, os.ErrnoToError(e)
-       }
-       return int(nn), nil
-}
-
-func (fd *FD) Write(p *[]byte) (n int, err *os.Error) {
-       if fd == nil {
-               return -1, os.EINVAL
-       }
-       total := len(p)
-       for len(p) > 0 {
-       L:      nn, e := syscall.write(fd.fd, &p[0], int64(len(p)))
-               switch {
-               case e == syscall.EAGAIN:
-                       WaitWrite(fd)
-                       goto L
-               case e != 0:
-                       return total - len(p), os.ErrnoToError(e)
-               }
-               p = p[nn:len(p)]
-       }
-       return total, nil
+export type Pollster struct {
+       kq int64;
+       eventbuf [10]syscall.Kevent;
+       events *[]syscall.Kevent;
 }
 
-func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) {
-       if fd == nil {
-               return nil, os.EINVAL
-       }
-L:     s, e := syscall.accept(fd.fd, sa)
-       switch {
-       case e == syscall.EAGAIN:
-               WaitRead(fd)
-               goto L
-       case e != 0:
+export func NewPollster() (p *Pollster, err *os.Error) {
+       p = new(Pollster);
+       var e int64;
+       if p.kq, e = syscall.kqueue(); e != 0 {
                return nil, os.ErrnoToError(e)
        }
-       nfd, err = NewFD(s)
-       if err != nil {
-               syscall.close(s)
-               return nil, err
-       }
-       return nfd, nil
-}
-
-
-// Waiting for FDs via kqueue(2).
-type Kstate struct {
-       cr *chan *FD;
-       cw *chan *FD;
-       pr *os.FD;
-       pw *os.FD;
-       pend *map[int64] *FD;
-       kq int64;
+       p.events = (&p.eventbuf)[0:0]
+       return p, nil
 }
 
-var kstate Kstate;
-
-func KqueueAdd(fd int64, mode byte, repeat bool) *os.Error {
-       if Debug { print("Kqueue add ", fd, " ", mode, " ", repeat, "\n") }
+func (p *Pollster) AddFD(fd int64, mode int, repeat bool) *os.Error {
        var kmode int16;
        if mode == 'r' {
                kmode = syscall.EVFILT_READ
        } else {
                kmode = syscall.EVFILT_WRITE
        }
-
        var events [1]syscall.Kevent;
        ev := &events[0];
        ev.ident = fd;
        ev.filter = kmode;
 
        // EV_ADD - add event to kqueue list
-       // EV_RECEIPT - generate fake EV_ERROR as result of add
+       // EV_RECEIPT - generate fake EV_ERROR as result of add,
+       //      rather than waiting for real event
        // EV_ONESHOT - delete the event the first time it triggers
        ev.flags = syscall.EV_ADD | syscall.EV_RECEIPT
        if !repeat {
                ev.flags |= syscall.EV_ONESHOT
        }
 
-       n, e := syscall.kevent(kstate.kq, &events, &events, nil);
+       n, e := syscall.kevent(p.kq, &events, &events, nil);
        if e != 0 {
                return os.ErrnoToError(e)
        }
@@ -169,166 +62,29 @@ func KqueueAdd(fd int64, mode byte, repeat bool) *os.Error {
        return nil
 }
 
-func KqueueAddFD(fd *FD, mode byte) *os.Error {
-       if e := KqueueAdd(fd.fd, 'r', false); e != nil {
-               return e
-       }
-       id := fd.fd << 1
-       if mode == 'r' {
-               fd.ncr++
-       } else {
-               id++
-               fd.ncw++
-       }
-       kstate.pend[id] = fd
-       return nil
-}
-
-func KqueueGet(events *[]syscall.Kevent) (n int, err *os.Error) {
-       var nn, e int64;
-       if nn, e = syscall.kevent(kstate.kq, nil, events, nil); e != 0 {
-               return -1, os.ErrnoToError(e)
+func (p *Pollster) WaitFD() (fd int64, mode int, err *os.Error) {
+       for len(p.events) == 0 {
+               nn, e := syscall.kevent(p.kq, nil, &p.eventbuf, nil)
+               if e != 0 {
+                       if e == syscall.EAGAIN || e == syscall.EINTR {
+                               continue
+                       }
+                       return -1, 0, os.ErrnoToError(e)
+               }
+               p.events = (&p.eventbuf)[0:nn]
        }
-       return int(nn),  nil
-}
-
-func KqueueLookup(ev *syscall.Kevent) (fd *FD, mode byte) {
-       id := ev.ident << 1
+       ev := &p.events[0];
+       p.events = p.events[1:len(p.events)];
+       fd = ev.ident;
        if ev.filter == syscall.EVFILT_READ {
                mode = 'r'
        } else {
-               id++
                mode = 'w'
        }
-       var ok bool
-       if fd, ok = kstate.pend[id]; !ok {
-               return nil, 0
-       }
-       kstate.pend[id] = nil, false
-       return fd, mode
-}
-
-func Serve() {
-       var r, e int64;
-       k := &kstate;
-
-       if Debug { print("Kqueue server running\n") }
-       var events [10]syscall.Kevent;
-       var scratch [100]byte;
-       for {
-               var n int
-               var err *os.Error;
-               if n, err = KqueueGet(&events); err != nil {
-                       print("kqueue get: ", err.String(), "\n")
-                       return
-               }
-               if Debug { print("Kqueue server get ", n, "\n") }
-               for i := 0; i < n; i++ {
-                       ev := &events[i]
-                       if ev.ident == k.pr.fd {
-                               if Debug { print("Kqueue server wakeup\n") }
-                               // Drain our wakeup pipe
-                               for {
-                                       nn, e := k.pr.Read(&scratch)
-                                       if Debug { print("Read ", k.pr.fd, " ", nn, " ", e.String(), "\n") }
-                                       if nn <= 0 {
-                                               break
-                                       }
-                               }
-
-                               if Debug { print("Kqueue server drain channels\n") }
-                               // Then read from channels.
-                               for {
-                                       fd, ok := <-k.cr
-                                       if !ok {
-                                               break
-                                       }
-                                       KqueueAddFD(fd, 'r')
-                               }
-                               for {
-                                       fd, ok := <-k.cw
-                                       if !ok {
-                                               break
-                                       }
-                                       KqueueAddFD(fd, 'w')
-                               }
-                               if Debug { print("Kqueue server awake\n") }
-                               continue
-                       }
-
-                       // Otherwise, wakeup the right FD.
-                       fd, mode := KqueueLookup(ev);
-                       if fd == nil {
-                               print("kqueue: unexpected wakeup for fd=", ev.ident, " filter=", ev.filter, "\n")
-                               continue
-                       }
-                       if mode == 'r' {
-                               if Debug { print("Kqueue server r fd=", fd.fd, " ncr=", fd.ncr, "\n") }
-                               for fd.ncr > 0 {
-                                       fd.ncr--
-                                       fd.cr <- fd
-                               }
-                       } else {
-                               if Debug { print("Kqueue server w fd=", fd.fd, " ncw=", fd.ncw, "\n") }
-                               for fd.ncw > 0 {
-                                       fd.ncw--
-                                       fd.cw <- fd
-                               }
-                       }
-               }
-       }
-}
-
-func StartServer() {
-       k := &kstate;
-
-       k.cr = new(chan *FD, 1);
-       k.cw = new(chan *FD, 1);
-       k.pend = new(map[int64] *FD)
-
-       var err *os.Error
-       if k.pr, k.pw, err = os.Pipe(); err != nil {
-               print("kqueue pipe: ", err.String(), "\n")
-               return
-       }
-
-       if err := MakeNonblocking(k.pr.fd); err != nil {
-               print("make nonblocking pr: ", err.String(), "\n")
-               return
-       }
-       if err := MakeNonblocking(k.pw.fd); err != nil {
-               print("make nonblocking pw: ", err.String(), "\n")
-               return
-       }
-
-       var e int64
-       if k.kq, e = syscall.kqueue(); e != 0 {
-               err := os.ErrnoToError(e);
-               print("kqueue: ", err.String(), "\n")
-               return
-       }
-
-       if err := KqueueAdd(k.pr.fd, 'r', true); err != nil {
-               print("kqueue add pipe: ", err.String(), "\n")
-               return
-       }
-
-       go Serve()
-}
-
-func WakeupServer() {
-       var b [1]byte;
-       kstate.pw.Write(&b);
-}
-
-func WaitRead(fd *FD) {
-       kstate.cr <- fd;
-       WakeupServer();
-       <-fd.cr
+       return fd, mode, nil
 }
 
-func WaitWrite(fd *FD) {
-       kstate.cw <- fd;
-       WakeupServer();
-       <-fd.cw
+func (p *Pollster) Close() *os.Error {
+       r, e := syscall.close(p.kq)
+       return os.ErrnoToError(e)
 }
index dc7ba2db149d74db5f73b9dc66c423853f76f89f..e20ca940112d1fbd892351d167b7a2ac4483c92b 100644 (file)
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Network file descriptors.
+// Waiting for FDs via epoll(7).
 
 package net
 
 import (
+       "net";
        "os";
        "syscall";
-       "net"
 )
 
-/* BUG 6g has trouble with this.
-
-export type FD os.FD;
+const (
+       Read = syscall.EPOLLIN | syscall.EPOLLRDHUP;
+       Write = syscall.EPOLLOUT
+)
 
-export func NewFD(fd int64) (nfd *FD, err *os.Error) {
-       ofd := os.NewFD(fd)
-       return ofd, nil
-}
+export type Pollster struct {
+       epfd int64;
 
-func (fd *FD) Close() *os.Error {
-       var ofd *os.FD = fd
-       return ofd.Close()
+       // Events we're already waiting for
+       events *map[int64] uint;
 }
 
-func (fd *FD) Read(p *[]byte) (n int, err *os.Error) {
-       var ofd *os.FD = fd;
-       n, err = ofd.Read(p)
-       return n, err
-}
+export func NewPollster() (p *Pollster, err *os.Error) {
+       p = new(Pollster);
+       var e int64;
 
-func (fd *FD) Write(p *[]byte) (n int, err *os.Error) {
-       var ofd *os.FD = fd;
-       n, err = ofd.Write(p)
-       return n, err
+       // The arg to epoll_create is a hint to the kernel
+       // about the number of FDs we will care about.
+       // We don't know.
+       if p.epfd, e = syscall.epoll_create(16); e != 0 {
+               return nil, os.ErrnoToError(e)
+       }
+       p.events = new(map[int64] uint)
+       return p, nil
 }
 
-*/
-
-// TODO: Replace with epoll.
-
-export type FD struct {
-       fd int64;
-       osfd *os.FD;
-}
+func (p *Pollster) AddFD(fd int64, mode int, repeat bool) *os.Error {
+       var ev syscall.EpollEvent
+       var already bool;
+       ev.fd = int32(fd);
+       ev.events, already = p.events[fd]
+       if !repeat {
+               ev.events |= syscall.EPOLLONESHOT
+       }
+       if mode == 'r' {
+               ev.events |= Read
+       } else {
+               ev.events |= Write
+       }
 
-export func NewFD(fd int64) (nfd *FD, err *os.Error) {
-       nfd = new(FD);
-       nfd.osfd = os.NewFD(fd);
-       nfd.fd = fd
-       return nfd, nil
+       var op int64
+       if already {
+               op = syscall.EPOLL_CTL_MOD
+       } else {
+               op = syscall.EPOLL_CTL_ADD
+       }
+       if e := syscall.epoll_ctl(p.epfd, op, fd, &ev); e != 0 {
+               return os.ErrnoToError(e)
+       }
+       p.events[fd] = ev.events
+       return nil
 }
 
-func (fd *FD) Close() *os.Error {
-       return fd.osfd.Close()
-}
+func (p *Pollster) StopWaiting(fd int64, bits uint) {
+       events, already := p.events[fd]
+       if !already {
+               print("Epoll unexpected fd=", fd, "\n")
+               return
+       }
 
-func (fd *FD) Read(p *[]byte) (n int, err *os.Error) {
-       n, err = fd.osfd.Read(p)
-       return n, err
-}
+       // If syscall.EPOLLONESHOT is not set, the wait
+       // is a repeating wait, so don't change it.
+       if events & syscall.EPOLLONESHOT == 0 {
+               return
+       }
 
-func (fd *FD) Write(p *[]byte) (n int, err *os.Error) {
-       n, err = fd.osfd.Write(p)
-       return n, err
+       // Disable the given bits.
+       // If we're still waiting for other events, modify the fd
+       // event in the kernel.  Otherwise, delete it.
+       events &= ^bits
+       if int32(events) & ^syscall.EPOLLONESHOT != 0 {
+               var ev syscall.EpollEvent;
+               ev.fd = int32(fd);
+               ev.events = events;
+               if e := syscall.epoll_ctl(p.epfd, syscall.EPOLL_CTL_MOD, fd, &ev); e != 0 {
+                       print("Epoll modify fd=", fd, ": ", os.ErrnoToError(e).String(), "\n")
+               }
+               p.events[fd] = events
+       } else {
+               if e := syscall.epoll_ctl(p.epfd, syscall.EPOLL_CTL_DEL, fd, nil); e != 0 {
+                       print("Epoll delete fd=", fd, ": ", os.ErrnoToError(e).String(), "\n")
+               }
+               p.events[fd] = 0, false
+       }
 }
 
-func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) {
-       s, e := syscall.accept(fd.fd, sa);
+func (p *Pollster) WaitFD() (fd int64, mode int, err *os.Error) {
+       // Get an event.
+       var evarray [1]syscall.EpollEvent;
+       ev := &evarray[0];
+       n, e := syscall.epoll_wait(p.epfd, &evarray, -1)
+       for e == syscall.EAGAIN || e == syscall.EINTR {
+               n, e = syscall.epoll_wait(p.epfd, &evarray, -1)
+       }
        if e != 0 {
-               return nil, os.ErrnoToError(e)
+               return -1, 0, os.ErrnoToError(e)
        }
-       nfd, err = NewFD(s)
-       return nfd, err
+       fd = int64(ev.fd)
+
+       if ev.events & Write != 0 {
+               p.StopWaiting(fd, Write)
+               return fd, 'w', nil
+       }
+       if ev.events & Read != 0 {
+               p.StopWaiting(fd, Read)
+               return fd, 'r', nil
+       }
+
+       // Other events are error conditions - wake whoever is waiting.
+       events, already := p.events[fd]
+       if events & Write != 0 {
+               p.StopWaiting(fd, Write)
+               return fd, 'w', nil
+       }
+       p.StopWaiting(fd, Read)
+       return fd, 'r', nil
 }
 
+func (p *Pollster) Close() *os.Error {
+       r, e := syscall.close(p.epfd)
+       return os.ErrnoToError(e)
+}
index bc0b8f6d3a9142893b78786d7eb52eca40558269..17ffa5a9ec91a1d0cae53d271e007ead06ddf5f6 100644 (file)
@@ -25,11 +25,21 @@ export func IPv4ToSockaddr(p *[]byte, port int) (sa1 *syscall.Sockaddr, err *os.
        return syscall.SockaddrInet4ToSockaddr(sa), nil
 }
 
+var IPv6zero [16]byte;
+
 export func IPv6ToSockaddr(p *[]byte, port int) (sa1 *syscall.Sockaddr, err *os.Error) {
        p = ToIPv6(p)
        if p == nil || port < 0 || port > 0xFFFF {
                return nil, os.EINVAL
        }
+
+       // IPv4 callers use 0.0.0.0 to mean "announce on any available address".
+       // In IPv6 mode, Linux treats that as meaning "announce on 0.0.0.0",
+       // which it refuses to do.  Rewrite to the IPv6 all zeros.
+       if p4 := ToIPv4(p); p4 != nil && p4[0] == 0 && p4[1] == 0 && p4[2] == 0 && p4[3] == 0 {
+               p = &IPv6zero;
+       }
+
        sa := new(syscall.SockaddrInet6);
        sa.family = syscall.AF_INET6;
        sa.port[0] = byte(port>>8);
index d0447d6e5373eb258bcaa4f17767076e1b69d131..c2205b990ed7e06d5147c8759aeaeaa97b63eee3 100644 (file)
@@ -23,6 +23,11 @@ TEXT syscall·KeventPtr(SB),7,$-8
        MOVQ    AX, 16(SP)
        RET
 
+TEXT   syscall·EpollEventPtr(SB),7,$-8
+       MOVQ    8(SP), AX
+       MOVQ    AX, 16(SP)
+       RET
+
 TEXT   syscall·LingerPtr(SB),7,$-8
        MOVQ    8(SP), AX
        MOVQ    AX, 16(SP)
index fdf690fbb2438246717abc49338641a8bb9f3e35..b5120a70b7fab345d13f5e66a854f5d385a70595 100644 (file)
@@ -78,7 +78,12 @@ export func unlink(name string) (ret int64, errno int64) {
        if !StringToBytes(&namebuf, name) {
                return -1, ENAMETOOLONG
        }
-       const SYSUNLINK = 87;
        r1, r2, err := Syscall(SYS_UNLINK, BytePtr(&namebuf[0]), 0, 0);
        return r1, err;
 }
+
+export func fcntl(fd, cmd, arg int64) (ret int64, errno int64) {
+       r1, r2, err := Syscall(SYS_FCNTL, fd, cmd, arg)
+       return r1, err
+}
+
index b690e81a2ca40de570ec5a9932afc9faef47e4ee..be55bca7bf1f9b0c9988d5071834ea18a53566a1 100644 (file)
@@ -95,5 +95,23 @@ export func getsockopt(fd, level, opt, valueptr, lenptr int64) (ret int64, errno
 }
 */
 
-// TODO: epoll
+export func epoll_create(size int64) (ret int64, errno int64) {
+       r1, r2, err := syscall.Syscall(SYS_EPOLL_CREATE, size, 0, 0);
+       return r1, err
+}
+
+export func epoll_ctl(epfd, op, fd int64, ev *EpollEvent) int64 {
+       r1, r2, err := syscall.Syscall6(SYS_EPOLL_CTL, epfd, op, fd, EpollEventPtr(ev), 0, 0);
+       return err
+}
+
+export func epoll_wait(epfd int64, ev *[]EpollEvent, msec int64) (ret int64, err int64) {
+       var evptr, nev int64;
+       if ev != nil && len(ev) > 0 {
+               nev = int64(len(ev));
+               evptr = EpollEventPtr(&ev[0])
+       }
+       r1, r2, err1 := syscall.Syscall6(SYS_EPOLL_WAIT, epfd, evptr, nev, msec, 0, 0);
+       return r1, err1
+}
 
index 7de141923d4aba492516da7cafcd60838b50178e..534827e3e9637ceecb237e36ff589c2ea0c14609 100644 (file)
@@ -37,6 +37,9 @@ export const (
        O_NDELAY = O_NONBLOCK;
        O_SYNC = 0x1000;
        O_TRUNC = 0x200;
+
+       F_GETFL = 3;
+       F_SETFL = 4;
 )
 
 export type Stat struct {
@@ -145,5 +148,25 @@ export func LingerPtr(l *Linger) int64;
 
 // Events (epoll)
 
-// TODO
+export const (
+       // EpollEvent.events
+       EPOLLIN = 0x1;
+       EPOLLOUT = 0x4;
+       EPOLLRDHUP = 0x2000;
+       EPOLLPRI = 0x2;
+       EPOLLERR = 0x8;
+       EPOLLET = 0x80000000;
+       EPOLLONESHOT = 0x40000000;
+
+       // op
+       EPOLL_CTL_ADD = 0x1;
+       EPOLL_CTL_MOD = 0x3;
+       EPOLL_CTL_DEL = 0x2;
+)
 
+export type EpollEvent struct {
+       events uint32;
+       fd int32;
+       pad int32;
+}
+export func EpollEventPtr(ev *EpollEvent) int64;