return 0, err
}
defer fd.readUnlock()
+ if fd.isFile {
+ fd.l.Lock()
+ defer fd.l.Unlock()
+ }
if len(buf) > maxRW {
buf = buf[:maxRW]
var n int
var err error
- if fd.isFile {
- fd.l.Lock()
- defer fd.l.Unlock()
- switch fd.kind {
- case kindConsole:
- n, err = fd.readConsole(buf)
- default:
- o := &fd.rop
- o.InitBuf(buf)
- n, err = execIO(o, func(o *operation) error {
- return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
- })
- fd.addOffset(n)
- if fd.kind == kindPipe && err != nil {
- switch err {
- case syscall.ERROR_BROKEN_PIPE:
- // Returned by pipes when the other end is closed.
- err = nil
- case syscall.ERROR_OPERATION_ABORTED:
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
- // If the fd is a pipe and the Read was interrupted by CancelIoEx,
- // we assume it is interrupted by Close.
- err = ErrFileClosing
- }
+ switch fd.kind {
+ case kindConsole:
+ n, err = fd.readConsole(buf)
+ case kindFile, kindPipe:
+ o := &fd.rop
+ o.InitBuf(buf)
+ n, err = execIO(o, func(o *operation) error {
+ return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
+ })
+ fd.addOffset(n)
+ if fd.kind == kindPipe && err != nil {
+ switch err {
+ case syscall.ERROR_BROKEN_PIPE:
+ // Returned by pipes when the other end is closed.
+ err = nil
+ case syscall.ERROR_OPERATION_ABORTED:
+ // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+ // If the fd is a pipe and the Read was interrupted by CancelIoEx,
+ // we assume it is interrupted by Close.
+ err = ErrFileClosing
}
}
- if err != nil {
- n = 0
- }
- } else {
+ case kindNet:
o := &fd.rop
o.InitBuf(buf)
n, err = execIO(o, func(o *operation) error {
defer fd.l.Unlock()
}
- ntotal := 0
- for len(buf) > 0 {
- b := buf
- if len(b) > maxRW {
- b = b[:maxRW]
+ var ntotal int
+ for {
+ max := len(buf)
+ if max-ntotal > maxRW {
+ max = ntotal + maxRW
}
+ b := buf[ntotal:max]
var n int
var err error
- if fd.isFile {
- switch fd.kind {
- case kindConsole:
- n, err = fd.writeConsole(b)
- default:
- o := &fd.wop
- o.InitBuf(b)
- n, err = execIO(o, func(o *operation) error {
- return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
- })
- fd.addOffset(n)
- if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
- // If the fd is a pipe and the Write was interrupted by CancelIoEx,
- // we assume it is interrupted by Close.
- err = ErrFileClosing
- }
- }
- if err != nil {
- n = 0
+ switch fd.kind {
+ case kindConsole:
+ n, err = fd.writeConsole(b)
+ case kindPipe, kindFile:
+ o := &fd.wop
+ o.InitBuf(b)
+ n, err = execIO(o, func(o *operation) error {
+ return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
+ })
+ fd.addOffset(n)
+ if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
+ // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+ // If the fd is a pipe and the Write was interrupted by CancelIoEx,
+ // we assume it is interrupted by Close.
+ err = ErrFileClosing
}
- } else {
+ case kindNet:
if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync))
}
})
}
ntotal += n
- if err != nil {
+ if ntotal == len(buf) || err != nil {
return ntotal, err
}
- buf = buf[n:]
+ if n == 0 {
+ return ntotal, io.ErrUnexpectedEOF
+ }
}
- return ntotal, nil
}
// writeConsole writes len(b) bytes to the console File.
defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart)
defer fd.setOffset(curoffset)
- ntotal := 0
- for len(buf) > 0 {
- b := buf
- if len(b) > maxRW {
- b = b[:maxRW]
+ var ntotal int
+ for {
+ max := len(buf)
+ if max-ntotal > maxRW {
+ max = ntotal + maxRW
}
+ b := buf[ntotal:max]
o := &fd.wop
o.InitBuf(b)
- fd.setOffset(off)
+ fd.setOffset(off + int64(ntotal))
n, err := execIO(o, func(o *operation) error {
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o)
})
- ntotal += int(n)
- if err != nil {
+ if n > 0 {
+ ntotal += n
+ }
+ if ntotal == len(buf) || err != nil {
return ntotal, err
}
- buf = buf[n:]
- off += int64(n)
+ if n == 0 {
+ return ntotal, io.ErrUnexpectedEOF
+ }
}
- return ntotal, nil
}
// Writev emulates the Unix writev system call.
var pipeCounter atomic.Uint64
-func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) {
+func newPipe(t testing.TB, overlapped, message bool) (string, *poll.FD) {
name := `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED
}
- h, err := windows.CreateNamedPipe(wname, uint32(flags), windows.PIPE_TYPE_BYTE, 1, 4096, 4096, 0, nil)
+ typ := windows.PIPE_TYPE_BYTE
+ if message {
+ typ = windows.PIPE_TYPE_MESSAGE
+ }
+ h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
if err != nil {
t.Fatal(err)
}
func TestPipe(t *testing.T) {
t.Run("overlapped", func(t *testing.T) {
- name, pipe := newPipe(t, true)
+ name, pipe := newPipe(t, true, false)
file := newFile(t, name, true)
testReadWrite(t, pipe, file)
})
t.Run("overlapped-write", func(t *testing.T) {
- name, pipe := newPipe(t, true)
+ name, pipe := newPipe(t, true, false)
file := newFile(t, name, false)
testReadWrite(t, file, pipe)
})
t.Run("overlapped-read", func(t *testing.T) {
- name, pipe := newPipe(t, false)
+ name, pipe := newPipe(t, false, false)
file := newFile(t, name, true)
testReadWrite(t, file, pipe)
})
t.Run("sync", func(t *testing.T) {
- name, pipe := newPipe(t, false)
+ name, pipe := newPipe(t, false, false)
file := newFile(t, name, false)
testReadWrite(t, file, pipe)
})
})
}
+func TestPipeWriteEOF(t *testing.T) {
+ name, pipe := newPipe(t, false, true)
+ file := newFile(t, name, false)
+ read := make(chan struct{}, 1)
+ go func() {
+ _, err := pipe.Write(nil)
+ read <- struct{}{}
+ if err != nil {
+ t.Error(err)
+ }
+ }()
+ <-read
+ var buf [10]byte
+ n, err := file.Read(buf[:])
+ if err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+}
+
func BenchmarkReadOverlapped(b *testing.B) {
benchmarkRead(b, true)
}
PIPE_ACCESS_OUTBOUND = 0x00000002
PIPE_ACCESS_DUPLEX = 0x00000003
- PIPE_TYPE_BYTE = 0x00000000
+ PIPE_TYPE_BYTE = 0x00000000
+ PIPE_TYPE_MESSAGE = 0x00000004
)
//sys GetOverlappedResult(handle syscall.Handle, overlapped *syscall.Overlapped, done *uint32, wait bool) (err error)