]> Cypherpunks repositories - gostls13.git/commitdiff
net: implement asynchonous cancelable I/O on Plan 9
authorDavid du Colombier <0intro@gmail.com>
Fri, 11 Nov 2016 19:49:11 +0000 (20:49 +0100)
committerBrad Fitzpatrick <bradfitz@golang.org>
Sat, 12 Nov 2016 05:44:36 +0000 (05:44 +0000)
This change is an experimental implementation of asynchronous
cancelable I/O operations on Plan 9, which are required to
implement deadlines.

There are no asynchronous syscalls on Plan 9. I/O operations
are performed with blocking pread and pwrite syscalls.

Implementing deadlines in Go requires a way to interrupt
I/O operations.

It is possible to interrupt reads and writes on a TCP connection
by forcing the closure of the TCP connection. This approach
has been used successfully in CL 31390.

However, we can't implement deadlines with this method, since
we require to be able to reuse the connection after the timeout.

On Plan 9, I/O operations are interrupted when the process
receives a note. We can rely on this behavior to implement
a more generic approach.

When doing an I/O operation (read or write), we start the I/O in
its own process, then wait for the result asynchronously. The
process is able to handle the "hangup" note. When receiving the
"hangup" note, the currently running I/O operation is canceled
and the process returns.

This way, deadlines can be implemented by sending an "hangup"
note to the process running the blocking I/O operation, after
the expiration of a timer.

Fixes #11932.
Fixes #17498.

Change-Id: I414f72c7a9a4f9b8f9c09ed3b6c269f899d9b430
Reviewed-on: https://go-review.googlesource.com/31521
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
src/net/fd_io_plan9.go [new file with mode: 0644]
src/net/fd_plan9.go
src/net/tcpsock_test.go
src/runtime/net_plan9.go [new file with mode: 0644]
src/runtime/os3_plan9.go
src/runtime/os_plan9.go

diff --git a/src/net/fd_io_plan9.go b/src/net/fd_io_plan9.go
new file mode 100644 (file)
index 0000000..76da0c5
--- /dev/null
@@ -0,0 +1,93 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+       "os"
+       "runtime"
+       "sync"
+       "syscall"
+)
+
+// asyncIO implements asynchronous cancelable I/O.
+// An asyncIO represents a single asynchronous Read or Write
+// operation. The result is returned on the result channel.
+// The undergoing I/O system call can either complete or be
+// interrupted by a note.
+type asyncIO struct {
+       res chan result
+
+       // mu guards the pid field.
+       mu sync.Mutex
+
+       // pid holds the process id of
+       // the process running the IO operation.
+       pid int
+}
+
+// result is the return value of a Read or Write operation.
+type result struct {
+       n   int
+       err error
+}
+
+// newAsyncIO returns a new asyncIO that performs an I/O
+// operation by calling fn, which must do one and only one
+// interruptible system call.
+func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO {
+       aio := &asyncIO{
+               res: make(chan result, 0),
+       }
+       aio.mu.Lock()
+       go func() {
+               // Lock the current goroutine to its process
+               // and store the pid in io so that Cancel can
+               // interrupt it. We ignore the "hangup" signal,
+               // so the signal does not take down the entire
+               // Go runtime.
+               runtime.LockOSThread()
+               runtime_ignoreHangup()
+               aio.pid = os.Getpid()
+               aio.mu.Unlock()
+
+               n, err := fn(b)
+
+               aio.mu.Lock()
+               aio.pid = -1
+               runtime_unignoreHangup()
+               aio.mu.Unlock()
+
+               aio.res <- result{n, err}
+       }()
+       return aio
+}
+
+var hangupNote os.Signal = syscall.Note("hangup")
+
+// Cancel interrupts the I/O operation, causing
+// the Wait function to return.
+func (aio *asyncIO) Cancel() {
+       aio.mu.Lock()
+       defer aio.mu.Unlock()
+       if aio.pid == -1 {
+               return
+       }
+       proc, err := os.FindProcess(aio.pid)
+       if err != nil {
+               return
+       }
+       proc.Signal(hangupNote)
+}
+
+// Wait for the I/O operation to complete.
+func (aio *asyncIO) Wait() (int, error) {
+       res := <-aio.res
+       return res.n, res.err
+}
+
+// The following functions, provided by the runtime, are used to
+// ignore and unignore the "hangup" signal received by the process.
+func runtime_ignoreHangup()
+func runtime_unignoreHangup()
index ab5db38dbec0c2aba7be4f7712b7178595827ce0..300d8c4543e90bb93cf3122e69abe958e787d6d6 100644 (file)
@@ -7,10 +7,17 @@ package net
 import (
        "io"
        "os"
+       "sync/atomic"
        "syscall"
        "time"
 )
 
+type atomicBool int32
+
+func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
+func (b *atomicBool) setFalse()   { atomic.StoreInt32((*int32)(b), 0) }
+func (b *atomicBool) setTrue()    { atomic.StoreInt32((*int32)(b), 1) }
+
 // Network file descriptor.
 type netFD struct {
        // locking/lifetime of sysfd + serialize access to Read and Write methods
@@ -23,6 +30,14 @@ type netFD struct {
        listen, ctl, data *os.File
        laddr, raddr      Addr
        isStream          bool
+
+       // deadlines
+       raio      *asyncIO
+       waio      *asyncIO
+       rtimer    *time.Timer
+       wtimer    *time.Timer
+       rtimedout atomicBool // set true when read deadline has been reached
+       wtimedout atomicBool // set true when write deadline has been reached
 }
 
 var (
@@ -84,6 +99,9 @@ func (fd *netFD) destroy() {
 }
 
 func (fd *netFD) Read(b []byte) (n int, err error) {
+       if fd.rtimedout.isSet() {
+               return 0, errTimeout
+       }
        if !fd.ok() || fd.data == nil {
                return 0, syscall.EINVAL
        }
@@ -94,10 +112,15 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
        if len(b) == 0 {
                return 0, nil
        }
-       n, err = fd.data.Read(b)
+       fd.raio = newAsyncIO(fd.data.Read, b)
+       n, err = fd.raio.Wait()
+       fd.raio = nil
        if isHangup(err) {
                err = io.EOF
        }
+       if isInterrupted(err) {
+               err = errTimeout
+       }
        if fd.net == "udp" && err == io.EOF {
                n = 0
                err = nil
@@ -106,6 +129,9 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
 }
 
 func (fd *netFD) Write(b []byte) (n int, err error) {
+       if fd.wtimedout.isSet() {
+               return 0, errTimeout
+       }
        if !fd.ok() || fd.data == nil {
                return 0, syscall.EINVAL
        }
@@ -113,7 +139,13 @@ func (fd *netFD) Write(b []byte) (n int, err error) {
                return 0, err
        }
        defer fd.writeUnlock()
-       return fd.data.Write(b)
+       fd.waio = newAsyncIO(fd.data.Write, b)
+       n, err = fd.waio.Wait()
+       fd.waio = nil
+       if isInterrupted(err) {
+               err = errTimeout
+       }
+       return
 }
 
 func (fd *netFD) closeRead() error {
@@ -185,15 +217,74 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) {
 }
 
 func (fd *netFD) setDeadline(t time.Time) error {
-       return syscall.EPLAN9
+       return setDeadlineImpl(fd, t, 'r'+'w')
 }
 
 func (fd *netFD) setReadDeadline(t time.Time) error {
-       return syscall.EPLAN9
+       return setDeadlineImpl(fd, t, 'r')
 }
 
 func (fd *netFD) setWriteDeadline(t time.Time) error {
-       return syscall.EPLAN9
+       return setDeadlineImpl(fd, t, 'w')
+}
+
+func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
+       d := t.Sub(time.Now())
+       if mode == 'r' || mode == 'r'+'w' {
+               fd.rtimedout.setFalse()
+       }
+       if mode == 'w' || mode == 'r'+'w' {
+               fd.wtimedout.setFalse()
+       }
+       if t.IsZero() || d < 0 {
+               // Stop timer
+               if mode == 'r' || mode == 'r'+'w' {
+                       if fd.rtimer != nil {
+                               fd.rtimer.Stop()
+                       }
+                       fd.rtimer = nil
+               }
+               if mode == 'w' || mode == 'r'+'w' {
+                       if fd.wtimer != nil {
+                               fd.wtimer.Stop()
+                       }
+                       fd.wtimer = nil
+               }
+       } else {
+               // Interrupt I/O operation once timer has expired
+               if mode == 'r' || mode == 'r'+'w' {
+                       fd.rtimer = time.AfterFunc(d, func() {
+                               fd.rtimedout.setTrue()
+                               if fd.raio != nil {
+                                       fd.raio.Cancel()
+                               }
+                       })
+               }
+               if mode == 'w' || mode == 'r'+'w' {
+                       fd.wtimer = time.AfterFunc(d, func() {
+                               fd.wtimedout.setTrue()
+                               if fd.waio != nil {
+                                       fd.waio.Cancel()
+                               }
+                       })
+               }
+       }
+       if !t.IsZero() && d < 0 {
+               // Interrupt current I/O operation
+               if mode == 'r' || mode == 'r'+'w' {
+                       fd.rtimedout.setTrue()
+                       if fd.raio != nil {
+                               fd.raio.Cancel()
+                       }
+               }
+               if mode == 'w' || mode == 'r'+'w' {
+                       fd.wtimedout.setTrue()
+                       if fd.waio != nil {
+                               fd.waio.Cancel()
+                       }
+               }
+       }
+       return nil
 }
 
 func setReadBuffer(fd *netFD, bytes int) error {
@@ -207,3 +298,7 @@ func setWriteBuffer(fd *netFD, bytes int) error {
 func isHangup(err error) bool {
        return err != nil && stringsHasSuffix(err.Error(), "Hangup")
 }
+
+func isInterrupted(err error) bool {
+       return err != nil && stringsHasSuffix(err.Error(), "interrupted")
+}
index 8b2d2ca484c5ba1f45291c70ac4f8411e9550099..7c8610d32baa808877aa4d0de01d5a795f6929b7 100644 (file)
@@ -467,6 +467,11 @@ func TestTCPConcurrentAccept(t *testing.T) {
 
 func TestTCPReadWriteAllocs(t *testing.T) {
        switch runtime.GOOS {
+       case "plan9":
+               // The implementation of asynchronous cancelable
+               // I/O on Plan 9 allocates memory.
+               // See net/fd_io_plan9.go.
+               t.Skipf("not supported on %s", runtime.GOOS)
        case "nacl":
                // NaCl needs to allocate pseudo file descriptor
                // stuff. See syscall/fd_nacl.go.
diff --git a/src/runtime/net_plan9.go b/src/runtime/net_plan9.go
new file mode 100644 (file)
index 0000000..10fd089
--- /dev/null
@@ -0,0 +1,29 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runtime
+
+import (
+       _ "unsafe"
+)
+
+//go:linkname runtime_ignoreHangup net.runtime_ignoreHangup
+func runtime_ignoreHangup() {
+       getg().m.ignoreHangup = true
+}
+
+//go:linkname runtime_unignoreHangup net.runtime_unignoreHangup
+func runtime_unignoreHangup(sig string) {
+       getg().m.ignoreHangup = false
+}
+
+func ignoredNote(note *byte) bool {
+       if note == nil {
+               return false
+       }
+       if gostringnocopy(note) != "hangup" {
+               return false
+       }
+       return getg().m.ignoreHangup
+}
index aff1d05b2585b309e849675020437169b96ca01d..26b4acd89a37ef972477aff263c8a199c6cb55b5 100644 (file)
@@ -100,6 +100,9 @@ func sighandler(_ureg *ureg, note *byte, gp *g) int {
                return _NCONT
        }
        if flags&_SigNotify != 0 {
+               if ignoredNote(note) {
+                       return _NCONT
+               }
                if sendNote(note) {
                        return _NCONT
                }
index 032aec1a46c7f1d44e2de0f7b15e47ff77f2aa09..ba2d5c55258e46dfd05742acd7656d7bfd0efc81 100644 (file)
@@ -13,6 +13,7 @@ type mOS struct {
        waitsemacount uint32
        notesig       *int8
        errstr        *byte
+       ignoreHangup  bool
 }
 
 func closefd(fd int32) int32