--- /dev/null
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "os"
+ "runtime"
+ "sync"
+ "syscall"
+)
+
+// asyncIO implements asynchronous cancelable I/O.
+// An asyncIO represents a single asynchronous Read or Write
+// operation. The result is returned on the result channel.
+// The undergoing I/O system call can either complete or be
+// interrupted by a note.
+type asyncIO struct {
+ res chan result
+
+ // mu guards the pid field.
+ mu sync.Mutex
+
+ // pid holds the process id of
+ // the process running the IO operation.
+ pid int
+}
+
+// result is the return value of a Read or Write operation.
+type result struct {
+ n int
+ err error
+}
+
+// newAsyncIO returns a new asyncIO that performs an I/O
+// operation by calling fn, which must do one and only one
+// interruptible system call.
+func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO {
+ aio := &asyncIO{
+ res: make(chan result, 0),
+ }
+ aio.mu.Lock()
+ go func() {
+ // Lock the current goroutine to its process
+ // and store the pid in io so that Cancel can
+ // interrupt it. We ignore the "hangup" signal,
+ // so the signal does not take down the entire
+ // Go runtime.
+ runtime.LockOSThread()
+ runtime_ignoreHangup()
+ aio.pid = os.Getpid()
+ aio.mu.Unlock()
+
+ n, err := fn(b)
+
+ aio.mu.Lock()
+ aio.pid = -1
+ runtime_unignoreHangup()
+ aio.mu.Unlock()
+
+ aio.res <- result{n, err}
+ }()
+ return aio
+}
+
+var hangupNote os.Signal = syscall.Note("hangup")
+
+// Cancel interrupts the I/O operation, causing
+// the Wait function to return.
+func (aio *asyncIO) Cancel() {
+ aio.mu.Lock()
+ defer aio.mu.Unlock()
+ if aio.pid == -1 {
+ return
+ }
+ proc, err := os.FindProcess(aio.pid)
+ if err != nil {
+ return
+ }
+ proc.Signal(hangupNote)
+}
+
+// Wait for the I/O operation to complete.
+func (aio *asyncIO) Wait() (int, error) {
+ res := <-aio.res
+ return res.n, res.err
+}
+
+// The following functions, provided by the runtime, are used to
+// ignore and unignore the "hangup" signal received by the process.
+func runtime_ignoreHangup()
+func runtime_unignoreHangup()
import (
"io"
"os"
+ "sync/atomic"
"syscall"
"time"
)
+type atomicBool int32
+
+func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
+func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
+func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
+
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
listen, ctl, data *os.File
laddr, raddr Addr
isStream bool
+
+ // deadlines
+ raio *asyncIO
+ waio *asyncIO
+ rtimer *time.Timer
+ wtimer *time.Timer
+ rtimedout atomicBool // set true when read deadline has been reached
+ wtimedout atomicBool // set true when write deadline has been reached
}
var (
}
func (fd *netFD) Read(b []byte) (n int, err error) {
+ if fd.rtimedout.isSet() {
+ return 0, errTimeout
+ }
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
if len(b) == 0 {
return 0, nil
}
- n, err = fd.data.Read(b)
+ fd.raio = newAsyncIO(fd.data.Read, b)
+ n, err = fd.raio.Wait()
+ fd.raio = nil
if isHangup(err) {
err = io.EOF
}
+ if isInterrupted(err) {
+ err = errTimeout
+ }
if fd.net == "udp" && err == io.EOF {
n = 0
err = nil
}
func (fd *netFD) Write(b []byte) (n int, err error) {
+ if fd.wtimedout.isSet() {
+ return 0, errTimeout
+ }
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
return 0, err
}
defer fd.writeUnlock()
- return fd.data.Write(b)
+ fd.waio = newAsyncIO(fd.data.Write, b)
+ n, err = fd.waio.Wait()
+ fd.waio = nil
+ if isInterrupted(err) {
+ err = errTimeout
+ }
+ return
}
func (fd *netFD) closeRead() error {
}
func (fd *netFD) setDeadline(t time.Time) error {
- return syscall.EPLAN9
+ return setDeadlineImpl(fd, t, 'r'+'w')
}
func (fd *netFD) setReadDeadline(t time.Time) error {
- return syscall.EPLAN9
+ return setDeadlineImpl(fd, t, 'r')
}
func (fd *netFD) setWriteDeadline(t time.Time) error {
- return syscall.EPLAN9
+ return setDeadlineImpl(fd, t, 'w')
+}
+
+func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
+ d := t.Sub(time.Now())
+ if mode == 'r' || mode == 'r'+'w' {
+ fd.rtimedout.setFalse()
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ fd.wtimedout.setFalse()
+ }
+ if t.IsZero() || d < 0 {
+ // Stop timer
+ if mode == 'r' || mode == 'r'+'w' {
+ if fd.rtimer != nil {
+ fd.rtimer.Stop()
+ }
+ fd.rtimer = nil
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ if fd.wtimer != nil {
+ fd.wtimer.Stop()
+ }
+ fd.wtimer = nil
+ }
+ } else {
+ // Interrupt I/O operation once timer has expired
+ if mode == 'r' || mode == 'r'+'w' {
+ fd.rtimer = time.AfterFunc(d, func() {
+ fd.rtimedout.setTrue()
+ if fd.raio != nil {
+ fd.raio.Cancel()
+ }
+ })
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ fd.wtimer = time.AfterFunc(d, func() {
+ fd.wtimedout.setTrue()
+ if fd.waio != nil {
+ fd.waio.Cancel()
+ }
+ })
+ }
+ }
+ if !t.IsZero() && d < 0 {
+ // Interrupt current I/O operation
+ if mode == 'r' || mode == 'r'+'w' {
+ fd.rtimedout.setTrue()
+ if fd.raio != nil {
+ fd.raio.Cancel()
+ }
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ fd.wtimedout.setTrue()
+ if fd.waio != nil {
+ fd.waio.Cancel()
+ }
+ }
+ }
+ return nil
}
func setReadBuffer(fd *netFD, bytes int) error {
func isHangup(err error) bool {
return err != nil && stringsHasSuffix(err.Error(), "Hangup")
}
+
+func isInterrupted(err error) bool {
+ return err != nil && stringsHasSuffix(err.Error(), "interrupted")
+}
--- /dev/null
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runtime
+
+import (
+ _ "unsafe"
+)
+
+//go:linkname runtime_ignoreHangup net.runtime_ignoreHangup
+func runtime_ignoreHangup() {
+ getg().m.ignoreHangup = true
+}
+
+//go:linkname runtime_unignoreHangup net.runtime_unignoreHangup
+func runtime_unignoreHangup(sig string) {
+ getg().m.ignoreHangup = false
+}
+
+func ignoredNote(note *byte) bool {
+ if note == nil {
+ return false
+ }
+ if gostringnocopy(note) != "hangup" {
+ return false
+ }
+ return getg().m.ignoreHangup
+}