From: qmuntal Date: Fri, 30 Jan 2026 13:29:58 +0000 (+0100) Subject: internal/poll: move buffer pinning inside execIO X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=cce3fea08f8b2509b7651ee552111a8cc763a459;p=gostls13.git internal/poll: move buffer pinning inside execIO This is a step towards deferring adding the handle to IOCP until the first IO operation. The goal of this CL is to avoid the fd.isBlocking check in fd.pin, which was happening outside execIO, and making buffer pinning less error-prone. This also fixes an issue where buffer used in Pwrite and WriteTo were unpinned too early when the write buffer was larger than the maximum chunk size. For #76391 Cq-Include-Trybots: luci.golang.try:gotip-windows-amd64-longtest,gotip-windows-amd64-race Change-Id: Ia181dcb57a559ae466a4341c36a307ad6678aac0 Reviewed-on: https://go-review.googlesource.com/c/go/+/740561 Reviewed-by: Damien Neil Reviewed-by: Michael Pratt LUCI-TryBot-Result: Go LUCI --- diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index ef8050daf4..699239c57b 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -232,27 +232,10 @@ func (fd *FD) waitIO(o *operation) error { return err } -// pin pins ptr for the duration of the IO operation. -// If fd is in blocking mode, pin does nothing. -func (fd *FD) pin(mode int, ptr any) { - if fd.isBlocking { - return - } - if mode == 'r' { - fd.readPinner.Pin(ptr) - } else { - fd.writePinner.Pin(ptr) - } -} - // execIO executes a single IO operation o. // It supports both synchronous and asynchronous IO. -func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) { - if mode == 'r' { - defer fd.readPinner.Unpin() - } else { - defer fd.writePinner.Unpin() - } +// buf, if not nil, will be pinned during the lifetime of the operation. +func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error), buf []byte) (int, error) { // Notify runtime netpoll about starting IO. err := fd.pd.prepare(mode, fd.isFile) if err != nil { @@ -268,21 +251,37 @@ func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, runtimeCtx: fd.pd.runtimeCtx, mode: int32(mode), } - // Start IO. - if !fd.isBlocking && !fd.pollable() { - // If the handle is opened for overlapped IO but we can't - // use the runtime poller, then we need to use an - // event to wait for the IO to complete. - h, err := windows.CreateEvent(nil, 0, 0, nil) - if err != nil { - // This shouldn't happen when all CreateEvent arguments are zero. - panic(err) + if !fd.isBlocking { + if len(buf) > 0 { + ptr := unsafe.SliceData(buf) + if mode == 'r' { + fd.readPinner.Pin(ptr) + } else { + fd.writePinner.Pin(ptr) + } + defer func() { + if mode == 'r' { + fd.readPinner.Unpin() + } else { + fd.writePinner.Unpin() + } + }() + } + if !fd.pollable() { + // If the handle is opened for overlapped IO but we can't + // use the runtime poller, then we need to use an + // event to wait for the IO to complete. + h, err := windows.CreateEvent(nil, 0, 0, nil) + if err != nil { + // This shouldn't happen when all CreateEvent arguments are zero. + panic(err) + } + // Set the low bit so that the external IOCP doesn't receive the completion packet. + o.o.HEvent = h | 1 + defer syscall.CloseHandle(h) } - // Set the low bit so that the external IOCP doesn't receive the completion packet. - o.o.HEvent = h | 1 - defer syscall.CloseHandle(h) } - fd.pin(mode, o) + // Start IO. qty, err := submit(o) var waitErr error // Blocking operations shouldn't return ERROR_IO_PENDING. @@ -544,10 +543,6 @@ func (fd *FD) Read(buf []byte) (int, error) { defer fd.readUnlock() } - if len(buf) > 0 { - fd.pin('r', &buf[0]) - } - if len(buf) > maxRW { buf = buf[:maxRW] } @@ -561,7 +556,7 @@ func (fd *FD) Read(buf []byte) (int, error) { n, err = fd.execIO('r', func(o *operation) (qty uint32, err error) { err = syscall.ReadFile(fd.Sysfd, buf, &qty, fd.overlapped(o)) return qty, err - }) + }, buf) fd.addOffset(n) switch err { case syscall.ERROR_HANDLE_EOF: @@ -577,7 +572,7 @@ func (fd *FD) Read(buf []byte) (int, error) { var flags uint32 err = syscall.WSARecv(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, &o.o, nil) return qty, err - }) + }, buf) if race.Enabled { race.Acquire(unsafe.Pointer(&ioSync)) } @@ -674,10 +669,6 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) { } defer fd.readWriteUnlock() - if len(buf) > 0 { - fd.pin('r', &buf[0]) - } - if len(buf) > maxRW { buf = buf[:maxRW] } @@ -702,7 +693,7 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) { n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { err = syscall.ReadFile(fd.Sysfd, buf, &qty, &o.o) return qty, err - }) + }, buf) if err == syscall.ERROR_HANDLE_EOF { err = io.EOF } @@ -725,8 +716,6 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) { } defer fd.readUnlock() - fd.pin('r', &buf[0]) - rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny) defer wsaRsaPool.Put(rsa) n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { @@ -734,7 +723,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) { var flags uint32 err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil) return qty, err - }) + }, buf) err = fd.eofError(n, err) if err != nil { return n, nil, err @@ -756,8 +745,6 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) } defer fd.readUnlock() - fd.pin('r', &buf[0]) - rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny) defer wsaRsaPool.Put(rsa) n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { @@ -765,7 +752,7 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) var flags uint32 err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil) return qty, err - }) + }, buf) err = fd.eofError(n, err) if err != nil { return n, err @@ -787,8 +774,6 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) } defer fd.readUnlock() - fd.pin('r', &buf[0]) - rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny) defer wsaRsaPool.Put(rsa) n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { @@ -796,7 +781,7 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) var flags uint32 err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil) return qty, err - }) + }, buf) err = fd.eofError(n, err) if err != nil { return n, err @@ -819,9 +804,6 @@ func (fd *FD) Write(buf []byte) (int, error) { defer fd.writeUnlock() } - if len(buf) > 0 { - fd.pin('w', &buf[0]) - } var ntotal int for { max := len(buf) @@ -838,7 +820,7 @@ func (fd *FD) Write(buf []byte) (int, error) { n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) { err = syscall.WriteFile(fd.Sysfd, b, &qty, fd.overlapped(o)) return qty, err - }) + }, b) fd.addOffset(n) case kindNet: if race.Enabled { @@ -847,7 +829,7 @@ func (fd *FD) Write(buf []byte) (int, error) { n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) { err = syscall.WSASend(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, &o.o, nil) return qty, err - }) + }, b) } ntotal += n if ntotal == len(buf) || err != nil { @@ -914,10 +896,6 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) { } defer fd.readWriteUnlock() - if len(buf) > 0 { - fd.pin('w', &buf[0]) - } - if fd.isBlocking { curoffset, err := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent) if err != nil { @@ -945,7 +923,7 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) { n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = syscall.WriteFile(fd.Sysfd, buf[ntotal:max], &qty, &o.o) return qty, err - }) + }, buf[ntotal:max]) if n > 0 { ntotal += n } @@ -975,7 +953,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) { n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = syscall.WSASend(fd.Sysfd, &(*bufs)[0], uint32(len(*bufs)), &qty, 0, &o.o, nil) return qty, err - }) + }, nil) TestHookDidWritev(n) consume(buf, int64(n)) return int64(n), err @@ -993,12 +971,10 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = syscall.WSASendto(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa, &o.o, nil) return qty, err - }) + }, nil) return n, err } - fd.pin('w', &buf[0]) - ntotal := 0 for len(buf) > 0 { b := buf @@ -1008,7 +984,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = syscall.WSASendto(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa, &o.o, nil) return qty, err - }) + }, b) ntotal += int(n) if err != nil { return ntotal, err @@ -1030,12 +1006,10 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendtoInet4(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa4, &o.o, nil) return qty, err - }) + }, nil) return n, err } - fd.pin('w', &buf[0]) - ntotal := 0 for len(buf) > 0 { b := buf @@ -1045,7 +1019,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendtoInet4(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa4, &o.o, nil) return qty, err - }) + }, b) ntotal += int(n) if err != nil { return ntotal, err @@ -1067,12 +1041,10 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendtoInet6(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa6, &o.o, nil) return qty, err - }) + }, nil) return n, err } - fd.pin('w', &buf[0]) - ntotal := 0 for len(buf) > 0 { b := buf @@ -1082,7 +1054,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendtoInet6(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa6, &o.o, nil) return qty, err - }) + }, b) ntotal += int(n) if err != nil { return ntotal, err @@ -1098,7 +1070,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) func (fd *FD) ConnectEx(ra syscall.Sockaddr) error { _, err := fd.execIO('w', func(o *operation) (uint32, error) { return 0, ConnectExFunc(fd.Sysfd, ra, nil, 0, nil, &o.o) - }) + }, nil) return err } @@ -1109,7 +1081,7 @@ func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny) (strin err = AcceptFunc(fd.Sysfd, s, (*byte)(unsafe.Pointer(&rawsa[0])), 0, rsan, rsan, &qty, &o.o) return qty, err - }) + }, nil) if err != nil { CloseFunc(s) return "acceptex", err @@ -1275,7 +1247,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error { } err = syscall.WSARecv(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, &flags, &o.o, nil) return qty, err - }) + }, nil) if err == windows.WSAEMSGSIZE { // expected with a 0-byte peek, ignore. } else if err != nil { @@ -1366,7 +1338,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.S n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil) return qty, err - }) + }, nil) err = fd.eofError(n, err) var sa syscall.Sockaddr if err == nil { @@ -1391,7 +1363,7 @@ func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.Sockadd n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil) return qty, err - }) + }, nil) err = fd.eofError(n, err) if err == nil { rawToSockaddrInet4(msg.Name, sa4) @@ -1415,7 +1387,7 @@ func (fd *FD) ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.Sockadd n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) { err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil) return qty, err - }) + }, nil) err = fd.eofError(n, err) if err == nil { rawToSockaddrInet6(msg.Name, sa6) @@ -1446,7 +1418,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil) return qty, err - }) + }, nil) return n, int(msg.Control.Len), err } @@ -1469,7 +1441,7 @@ func (fd *FD) WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (in n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil) return qty, err - }) + }, nil) return n, int(msg.Control.Len), err } @@ -1492,7 +1464,7 @@ func (fd *FD) WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (in n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) { err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil) return qty, err - }) + }, nil) return n, int(msg.Control.Len), err } diff --git a/src/internal/poll/sendfile_windows.go b/src/internal/poll/sendfile_windows.go index 2bdfecf013..0a5a8897dd 100644 --- a/src/internal/poll/sendfile_windows.go +++ b/src/internal/poll/sendfile_windows.go @@ -75,7 +75,7 @@ func SendFile(fd *FD, src uintptr, size int64) (written int64, err error, handle return 0, err } return uint32(chunkSize), nil - }) + }, nil) if err != nil { return written, err, written > 0 }