// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// TODO(rsc): All the prints in this file should go to standard error.
-
package net
import (
// will the fd be closed.
type pollServer struct {
- cr, cw chan *netFD // buffered >= 1
- pr, pw *os.File
- pending map[int]*netFD
- poll *pollster // low-level OS hooks
- deadline int64 // next deadline (nsec since 1970)
+ cr, cw chan *netFD // buffered >= 1
+ pr, pw *os.File
+ poll *pollster // low-level OS hooks
+ sync.Mutex // controls pending and deadline
+ pending map[int]*netFD
+ deadline int64 // next deadline (nsec since 1970)
}
func (s *pollServer) AddFD(fd *netFD, mode int) {
}
return
}
- if err := s.poll.AddFD(intfd, mode, false); err != nil {
- panic("pollServer AddFD " + err.String())
- return
- }
+
+ s.Lock()
var t int64
key := intfd << 1
t = fd.wdeadline
}
s.pending[key] = fd
+ doWakeup := false
if t > 0 && (s.deadline == 0 || t < s.deadline) {
s.deadline = t
+ doWakeup = true
+ }
+
+ if err := s.poll.AddFD(intfd, mode, false); err != nil {
+ panic("pollServer AddFD " + err.String())
+ }
+
+ s.Unlock()
+
+ if doWakeup {
+ s.Wakeup()
}
}
+var wakeupbuf [1]byte
+
+func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
+
func (s *pollServer) LookupFD(fd int, mode int) *netFD {
key := fd << 1
if mode == 'w' {
func (s *pollServer) Run() {
var scratch [100]byte
+ s.Lock()
+ defer s.Unlock()
for {
var t = s.deadline
if t > 0 {
continue
}
}
- fd, mode, err := s.poll.WaitFD(t)
+ fd, mode, err := s.poll.WaitFD(s, t)
if err != nil {
print("pollServer WaitFD: ", err.String(), "\n")
return
// but it's unlikely that there are more than
// len(scratch) wakeup calls).
s.pr.Read(scratch[0:])
- // Read from channels
- Update:
- for {
- select {
- case fd := <-s.cr:
- s.AddFD(fd, 'r')
- case fd := <-s.cw:
- s.AddFD(fd, 'w')
- default:
- break Update
- }
- }
+ s.CheckDeadlines()
} else {
netfd := s.LookupFD(fd, mode)
if netfd == nil {
}
}
-var wakeupbuf [1]byte
-
-func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
-
func (s *pollServer) WaitRead(fd *netFD) {
- s.cr <- fd
- s.Wakeup()
+ s.AddFD(fd, 'r')
<-fd.cr
}
func (s *pollServer) WaitWrite(fd *netFD) {
- s.cw <- fd
- s.Wakeup()
+ s.AddFD(fd, 'w')
<-fd.cw
}
syscall.Kevent(p.kq, events[0:], events[0:], nil)
}
-func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) {
+func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err os.Error) {
var t *syscall.Timespec
for len(p.events) == 0 {
if nsec > 0 {
}
*t = syscall.NsecToTimespec(nsec)
}
+
+ s.Unlock()
nn, e := syscall.Kevent(p.kq, nil, p.eventbuf[0:], t)
+ s.Lock()
+
if e != 0 {
if e == syscall.EINTR {
continue
syscall.Kevent(p.kq, events[:], nil, nil)
}
-func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) {
+func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err os.Error) {
var t *syscall.Timespec
for len(p.events) == 0 {
if nsec > 0 {
}
*t = syscall.NsecToTimespec(nsec)
}
+
+ s.Unlock()
nn, e := syscall.Kevent(p.kq, nil, p.eventbuf[:], t)
+ s.Lock()
+
if e != 0 {
if e == syscall.EINTR {
continue
epfd int
// Events we're already waiting for
+ // Must hold pollServer lock
events map[int]uint32
}
}
func (p *pollster) AddFD(fd int, mode int, repeat bool) os.Error {
+ // pollServer is locked.
+
var ev syscall.EpollEvent
var already bool
ev.Fd = int32(fd)
}
func (p *pollster) StopWaiting(fd int, bits uint) {
+ // pollServer is locked.
+
events, already := p.events[fd]
if !already {
print("Epoll unexpected fd=", fd, "\n")
}
func (p *pollster) DelFD(fd int, mode int) {
+ // pollServer is locked.
+
if mode == 'r' {
p.StopWaiting(fd, readFlags)
} else {
}
}
-func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) {
+func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err os.Error) {
+ s.Unlock()
+
// Get an event.
var evarray [1]syscall.EpollEvent
ev := &evarray[0]
for e == syscall.EAGAIN || e == syscall.EINTR {
n, e = syscall.EpollWait(p.epfd, evarray[0:], msec)
}
+
+ s.Lock()
+
if e != 0 {
return -1, 0, os.NewSyscallError("epoll_wait", e)
}