mode int32
}
-func (o *operation) setEvent() {
- h, err := windows.CreateEvent(nil, 0, 0, nil)
- if err != nil {
- // This shouldn't happen when all CreateEvent arguments are zero.
- panic(err)
- }
- // Set the low bit so that the external IOCP doesn't receive the completion packet.
- o.o.HEvent = h | 1
-}
-
-func (o *operation) close() {
- if o.o.HEvent != 0 {
- syscall.CloseHandle(o.o.HEvent)
- }
-}
-
func (fd *FD) overlapped(o *operation) *syscall.Overlapped {
if fd.isBlocking {
// Don't return the overlapped object if the file handle
},
}
+var operationPool = sync.Pool{
+ New: func() any {
+ return new(operation)
+ },
+}
+
// waitIO waits for the IO operation o to complete.
func (fd *FD) waitIO(o *operation) error {
if fd.isBlocking {
fd.pd.waitCanceled(int(o.mode))
}
+// pin pins ptr for the duration of the IO operation.
+// If fd is in blocking mode, pin does nothing.
+func (fd *FD) pin(mode int, ptr any) {
+ if fd.isBlocking {
+ return
+ }
+ if mode == 'r' {
+ fd.readPinner.Pin(ptr)
+ } else {
+ fd.writePinner.Pin(ptr)
+ }
+}
+
// execIO executes a single IO operation o.
// It supports both synchronous and asynchronous IO.
-// o.qty and o.flags are set to zero before calling submit
-// to avoid reusing the values from a previous call.
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) {
+ if mode == 'r' {
+ defer fd.readPinner.Unpin()
+ } else {
+ defer fd.writePinner.Unpin()
+ }
// Notify runtime netpoll about starting IO.
err := fd.pd.prepare(mode, fd.isFile)
if err != nil {
return 0, err
}
- o := &fd.rop
- if mode == 'w' {
- o = &fd.wop
+ o := operationPool.Get().(*operation)
+ defer operationPool.Put(o)
+ *o = operation{
+ o: syscall.Overlapped{
+ OffsetHigh: uint32(fd.offset >> 32),
+ Offset: uint32(fd.offset),
+ },
+ runtimeCtx: fd.pd.runtimeCtx,
+ mode: int32(mode),
}
// Start IO.
- if !fd.isBlocking && o.o.HEvent == 0 && !fd.pollable() {
+ if !fd.isBlocking && !fd.pollable() {
// If the handle is opened for overlapped IO but we can't
// use the runtime poller, then we need to use an
// event to wait for the IO to complete.
- o.setEvent()
+ h, err := windows.CreateEvent(nil, 0, 0, nil)
+ if err != nil {
+ // This shouldn't happen when all CreateEvent arguments are zero.
+ panic(err)
+ }
+ // Set the low bit so that the external IOCP doesn't receive the completion packet.
+ o.o.HEvent = h | 1
+ defer syscall.CloseHandle(h)
}
+ fd.pin(mode, o)
qty, err := submit(o)
var waitErr error
// Blocking operations shouldn't return ERROR_IO_PENDING.
// System file descriptor. Immutable until Close.
Sysfd syscall.Handle
- // Read operation.
- rop operation
- // Write operation.
- wop operation
-
// I/O poller.
pd pollDesc
disassociated atomic.Bool
+ // readPinner and writePinner are automatically unpinned
+ // before execIO returns.
readPinner runtime.Pinner
writePinner runtime.Pinner
}
// using an external mechanism.
func (fd *FD) setOffset(off int64) {
fd.offset = off
- fd.rop.o.OffsetHigh, fd.rop.o.Offset = uint32(off>>32), uint32(off)
- fd.wop.o.OffsetHigh, fd.wop.o.Offset = uint32(off>>32), uint32(off)
}
// addOffset adds the given offset to the current offset.
}
fd.isFile = fd.kind != kindNet
fd.isBlocking = !pollable
- fd.rop.mode = 'r'
- fd.wop.mode = 'w'
// It is safe to add overlapped handles that also perform I/O
// outside of the runtime poller. The runtime poller will ignore
if err != nil {
return err
}
- fd.rop.runtimeCtx = fd.pd.runtimeCtx
- fd.wop.runtimeCtx = fd.pd.runtimeCtx
if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes {
// Non-socket handles can use SetFileCompletionNotificationModes without problems.
err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
if fd.Sysfd == syscall.InvalidHandle {
return syscall.EINVAL
}
- fd.rop.close()
- fd.wop.close()
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before fd.CloseFunc.
fd.pd.close()
defer fd.readUnlock()
}
- if len(buf) > 0 && !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('r', &buf[0])
}
if len(buf) > maxRW {
}
defer fd.readWriteUnlock()
- if len(buf) > 0 && !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('r', &buf[0])
}
if len(buf) > maxRW {
}
defer fd.readUnlock()
- if !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
- }
+ fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
}
defer fd.readUnlock()
- if !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
- }
+ fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
}
defer fd.readUnlock()
- if !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
- }
+ fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
defer fd.writeUnlock()
}
- if len(buf) > 0 && !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('w', &buf[0])
}
-
var ntotal int
for {
max := len(buf)
}
defer fd.readWriteUnlock()
- if len(buf) > 0 && !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('w', &buf[0])
}
if fd.isBlocking {
return n, err
}
- if !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
- }
+ fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
return n, err
}
- if !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
- }
+ fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
return n, err
}
- if !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
- }
+ fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {