import (
"errors"
"io"
+ "sync"
"sync/atomic"
"time"
)
Destroy func()
// deadlines
+ rmu sync.Mutex
+ wmu sync.Mutex
raio *asyncIO
waio *asyncIO
rtimer *time.Timer
// Read implements io.Reader.
func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) {
- if fd.rtimedout.isSet() {
- return 0, ErrDeadlineExceeded
- }
if err := fd.readLock(); err != nil {
return 0, err
}
if len(b) == 0 {
return 0, nil
}
+ fd.rmu.Lock()
+ if fd.rtimedout.isSet() {
+ fd.rmu.Unlock()
+ return 0, ErrDeadlineExceeded
+ }
fd.raio = newAsyncIO(fn, b)
+ fd.rmu.Unlock()
n, err := fd.raio.Wait()
fd.raio = nil
if isHangup(err) {
// Write implements io.Writer.
func (fd *FD) Write(fn func([]byte) (int, error), b []byte) (int, error) {
- if fd.wtimedout.isSet() {
- return 0, ErrDeadlineExceeded
- }
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
+ fd.wmu.Lock()
+ if fd.wtimedout.isSet() {
+ fd.wmu.Unlock()
+ return 0, ErrDeadlineExceeded
+ }
fd.waio = newAsyncIO(fn, b)
+ fd.wmu.Unlock()
n, err := fd.waio.Wait()
fd.waio = nil
if isInterrupted(err) {
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
d := t.Sub(time.Now())
if mode == 'r' || mode == 'r'+'w' {
+ fd.rmu.Lock()
+ defer fd.rmu.Unlock()
fd.rtimedout.setFalse()
}
if mode == 'w' || mode == 'r'+'w' {
+ fd.wmu.Lock()
+ defer fd.wmu.Unlock()
fd.wtimedout.setFalse()
}
if t.IsZero() || d < 0 {
// Interrupt I/O operation once timer has expired
if mode == 'r' || mode == 'r'+'w' {
fd.rtimer = time.AfterFunc(d, func() {
+ fd.rmu.Lock()
fd.rtimedout.setTrue()
if fd.raio != nil {
fd.raio.Cancel()
}
+ fd.rmu.Unlock()
})
}
if mode == 'w' || mode == 'r'+'w' {
fd.wtimer = time.AfterFunc(d, func() {
+ fd.wmu.Lock()
fd.wtimedout.setTrue()
if fd.waio != nil {
fd.waio.Cancel()
}
+ fd.wmu.Unlock()
})
}
}