From 9fb96991e63033ba963c7b1eff10e5c4f5a93b0a Mon Sep 17 00:00:00 2001 From: Dave Cheney Date: Wed, 5 Dec 2012 15:59:01 +1100 Subject: [PATCH] net: fix data races on deadline vars Fixes #4434. This proposal replaces the previous CL 6855110. Due to issue 599, 64-bit atomic operations should probably be avoided, so use a sync.Mutex instead. Benchmark comparisons against 025b9d070a85 on linux/386: CL 6855110: benchmark old ns/op new ns/op delta BenchmarkTCPOneShot 710024 727409 +2.45% BenchmarkTCPOneShotTimeout 758178 768620 +1.38% BenchmarkTCPPersistent 223464 228058 +2.06% BenchmarkTCPPersistentTimeout 234494 242600 +3.46% This proposal: benchmark old ns/op new ns/op delta BenchmarkTCPOneShot 710024 718492 +1.19% BenchmarkTCPOneShotTimeout 758178 748783 -1.24% BenchmarkTCPPersistent 223464 227628 +1.86% BenchmarkTCPPersistentTimeout 234494 238321 +1.63% R=rsc, dvyukov, mikioh.mikioh, alex.brainman, bradfitz CC=golang-dev, remyoudompheng https://golang.org/cl/6866050 --- src/pkg/net/fd_posix_test.go | 57 +++++++++++++++++++++ src/pkg/net/fd_unix.go | 88 +++++++++++++-------------------- src/pkg/net/fd_windows.go | 20 ++++---- src/pkg/net/net.go | 34 +++++++++++++ src/pkg/net/sendfile_freebsd.go | 2 +- src/pkg/net/sendfile_linux.go | 2 +- src/pkg/net/sock_posix.go | 6 +-- src/pkg/net/sockopt_posix.go | 21 +++----- 8 files changed, 147 insertions(+), 83 deletions(-) create mode 100644 src/pkg/net/fd_posix_test.go diff --git a/src/pkg/net/fd_posix_test.go b/src/pkg/net/fd_posix_test.go new file mode 100644 index 0000000000..8be0335d61 --- /dev/null +++ b/src/pkg/net/fd_posix_test.go @@ -0,0 +1,57 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin freebsd linux netbsd openbsd windows + +package net + +import ( + "testing" + "time" +) + +var deadlineSetTimeTests = []struct { + input time.Time + expected int64 +}{ + {time.Time{}, 0}, + {time.Date(2009, 11, 10, 23, 00, 00, 00, time.UTC), 1257894000000000000}, // 2009-11-10 23:00:00 +0000 UTC +} + +func TestDeadlineSetTime(t *testing.T) { + for _, tt := range deadlineSetTimeTests { + var d deadline + d.setTime(tt.input) + actual := d.value() + expected := int64(0) + if !tt.input.IsZero() { + expected = tt.input.UnixNano() + } + if actual != expected { + t.Errorf("set/value failed: expected %v, actual %v", expected, actual) + } + } +} + +var deadlineExpiredTests = []struct { + deadline time.Time + expired bool +}{ + // note, times are relative to the start of the test run, not + // the start of TestDeadlineExpired + {time.Now().Add(5 * time.Minute), false}, + {time.Now().Add(-5 * time.Minute), true}, + {time.Time{}, false}, // no deadline set +} + +func TestDeadlineExpired(t *testing.T) { + for _, tt := range deadlineExpiredTests { + var d deadline + d.set(tt.deadline.UnixNano()) + expired := d.expired() + if expired != tt.expired { + t.Errorf("expire failed: expected %v, actual %v", tt.expired, expired) + } + } +} diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go index a9a2ae6e85..6d8af0ab2e 100644 --- a/src/pkg/net/fd_unix.go +++ b/src/pkg/net/fd_unix.go @@ -37,11 +37,11 @@ type netFD struct { laddr Addr raddr Addr - // owned by client - rdeadline int64 - rio sync.Mutex - wdeadline int64 - wio sync.Mutex + // serialize access to Read and Write methods + rio, wio sync.Mutex + + // read and write deadlines + rdeadline, wdeadline deadline // owned by fd wait server ncr, ncw int @@ -82,11 +82,11 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error { key := intfd << 1 if mode == 'r' { fd.ncr++ - t = fd.rdeadline + t = fd.rdeadline.value() } else { fd.ncw++ key++ - t = fd.wdeadline + t = fd.wdeadline.value() } s.pending[key] = fd doWakeup := false @@ -153,12 +153,8 @@ func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { } } -func (s *pollServer) Now() int64 { - return time.Now().UnixNano() -} - func (s *pollServer) CheckDeadlines() { - now := s.Now() + now := time.Now().UnixNano() // TODO(rsc): This will need to be handled more efficiently, // probably with a heap indexed by wakeup time. @@ -172,9 +168,9 @@ func (s *pollServer) CheckDeadlines() { mode = 'w' } if mode == 'r' { - t = fd.rdeadline + t = fd.rdeadline.value() } else { - t = fd.wdeadline + t = fd.wdeadline.value() } if t > 0 { if t <= now { @@ -198,15 +194,15 @@ func (s *pollServer) Run() { s.Lock() defer s.Unlock() for { - var t = s.deadline - if t > 0 { - t = t - s.Now() - if t <= 0 { + var timeout int64 // nsec to wait for or 0 for none + if s.deadline > 0 { + timeout = s.deadline - time.Now().UnixNano() + if timeout <= 0 { s.CheckDeadlines() continue } } - fd, mode, err := s.poll.WaitFD(s, t) + fd, mode, err := s.poll.WaitFD(s, timeout) if err != nil { print("pollServer WaitFD: ", err.Error(), "\n") return @@ -418,11 +414,9 @@ func (fd *netFD) Read(p []byte) (n int, err error) { } defer fd.decref() for { - if fd.rdeadline > 0 { - if time.Now().UnixNano() >= fd.rdeadline { - err = errTimeout - break - } + if fd.rdeadline.expired() { + err = errTimeout + break } n, err = syscall.Read(int(fd.sysfd), p) if err != nil { @@ -450,11 +444,9 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { } defer fd.decref() for { - if fd.rdeadline > 0 { - if time.Now().UnixNano() >= fd.rdeadline { - err = errTimeout - break - } + if fd.rdeadline.expired() { + err = errTimeout + break } n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0) if err != nil { @@ -482,15 +474,13 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S } defer fd.decref() for { - if fd.rdeadline > 0 { - if time.Now().UnixNano() >= fd.rdeadline { - err = errTimeout - break - } + if fd.rdeadline.expired() { + err = errTimeout + break } n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0) if err != nil { - // TODO(dfc) should n and oobn be set to nil + // TODO(dfc) should n and oobn be set to 0 if err == syscall.EAGAIN { if err = fd.pollServer.WaitRead(fd); err == nil { continue @@ -513,21 +503,17 @@ func chkReadErr(n int, err error, fd *netFD) error { return err } -func (fd *netFD) Write(p []byte) (int, error) { +func (fd *netFD) Write(p []byte) (nn int, err error) { fd.wio.Lock() defer fd.wio.Unlock() if err := fd.incref(false); err != nil { return 0, err } defer fd.decref() - var err error - nn := 0 for { - if fd.wdeadline > 0 { - if time.Now().UnixNano() >= fd.wdeadline { - err = errTimeout - break - } + if fd.wdeadline.expired() { + err = errTimeout + break } var n int n, err = syscall.Write(int(fd.sysfd), p[nn:]) @@ -565,11 +551,9 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { } defer fd.decref() for { - if fd.wdeadline > 0 { - if time.Now().UnixNano() >= fd.wdeadline { - err = errTimeout - break - } + if fd.wdeadline.expired() { + err = errTimeout + break } err = syscall.Sendto(fd.sysfd, p, 0, sa) if err == syscall.EAGAIN { @@ -595,11 +579,9 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob } defer fd.decref() for { - if fd.wdeadline > 0 { - if time.Now().UnixNano() >= fd.wdeadline { - err = errTimeout - break - } + if fd.wdeadline.expired() { + err = errTimeout + break } err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) if err == syscall.EAGAIN { diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go index 44b6663af9..18712191fe 100644 --- a/src/pkg/net/fd_windows.go +++ b/src/pkg/net/fd_windows.go @@ -285,11 +285,11 @@ type netFD struct { errnoc [2]chan error // read/write submit or cancel operation errors closec chan bool // used by Close to cancel pending IO - // owned by client - rdeadline int64 - rio sync.Mutex - wdeadline int64 - wio sync.Mutex + // serialize access to Read and Write methods + rio, wio sync.Mutex + + // read and write deadlines + rdeadline, wdeadline deadline } func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD { @@ -426,7 +426,7 @@ func (fd *netFD) Read(buf []byte) (int, error) { defer fd.rio.Unlock() var o readOp o.Init(fd, buf, 'r') - n, err := iosrv.ExecIO(&o, fd.rdeadline) + n, err := iosrv.ExecIO(&o, fd.rdeadline.value()) if err == nil && n == 0 { err = io.EOF } @@ -463,7 +463,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { var o readFromOp o.Init(fd, buf, 'r') o.rsan = int32(unsafe.Sizeof(o.rsa)) - n, err = iosrv.ExecIO(&o, fd.rdeadline) + n, err = iosrv.ExecIO(&o, fd.rdeadline.value()) if err != nil { return 0, nil, err } @@ -495,7 +495,7 @@ func (fd *netFD) Write(buf []byte) (int, error) { defer fd.wio.Unlock() var o writeOp o.Init(fd, buf, 'w') - return iosrv.ExecIO(&o, fd.wdeadline) + return iosrv.ExecIO(&o, fd.wdeadline.value()) } // WriteTo to network. @@ -527,7 +527,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { var o writeToOp o.Init(fd, buf, 'w') o.sa = sa - return iosrv.ExecIO(&o, fd.wdeadline) + return iosrv.ExecIO(&o, fd.wdeadline.value()) } // Accept new network connections. @@ -577,7 +577,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { var o acceptOp o.Init(fd, 'r') o.newsock = s - _, err = iosrv.ExecIO(&o, fd.rdeadline) + _, err = iosrv.ExecIO(&o, fd.rdeadline.value()) if err != nil { closesocket(s) return nil, err diff --git a/src/pkg/net/net.go b/src/pkg/net/net.go index 9af0514908..f60c1e4cb2 100644 --- a/src/pkg/net/net.go +++ b/src/pkg/net/net.go @@ -46,6 +46,7 @@ import ( "errors" "io" "os" + "sync" "syscall" "time" ) @@ -375,3 +376,36 @@ func genericReadFrom(w io.Writer, r io.Reader) (n int64, err error) { // Use wrapper to hide existing r.ReadFrom from io.Copy. return io.Copy(writerOnly{w}, r) } + +// deadline is an atomically-accessed number of nanoseconds since 1970 +// or 0, if no deadline is set. +type deadline struct { + sync.Mutex + val int64 +} + +func (d *deadline) expired() bool { + t := d.value() + return t > 0 && time.Now().UnixNano() >= t +} + +func (d *deadline) value() (v int64) { + d.Lock() + v = d.val + d.Unlock() + return +} + +func (d *deadline) set(v int64) { + d.Lock() + d.val = v + d.Unlock() +} + +func (d *deadline) setTime(t time.Time) { + if t.IsZero() { + d.set(0) + } else { + d.set(t.UnixNano()) + } +} diff --git a/src/pkg/net/sendfile_freebsd.go b/src/pkg/net/sendfile_freebsd.go index 8500006104..8008bc3b56 100644 --- a/src/pkg/net/sendfile_freebsd.go +++ b/src/pkg/net/sendfile_freebsd.go @@ -82,7 +82,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { if n == 0 && err1 == nil { break } - if err1 == syscall.EAGAIN && c.wdeadline >= 0 { + if err1 == syscall.EAGAIN { if err1 = c.pollServer.WaitWrite(c); err1 == nil { continue } diff --git a/src/pkg/net/sendfile_linux.go b/src/pkg/net/sendfile_linux.go index 5ee18f9ccc..3357e65386 100644 --- a/src/pkg/net/sendfile_linux.go +++ b/src/pkg/net/sendfile_linux.go @@ -58,7 +58,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { if n == 0 && err1 == nil { break } - if err1 == syscall.EAGAIN && c.wdeadline >= 0 { + if err1 == syscall.EAGAIN { if err1 = c.pollServer.WaitWrite(c); err1 == nil { continue } diff --git a/src/pkg/net/sock_posix.go b/src/pkg/net/sock_posix.go index dce5ec1132..12015ef0ac 100644 --- a/src/pkg/net/sock_posix.go +++ b/src/pkg/net/sock_posix.go @@ -56,15 +56,13 @@ func socket(net string, f, t, p int, ipv6only bool, ulsa, ursa syscall.Sockaddr, } if ursa != nil { - if !deadline.IsZero() { - fd.wdeadline = deadline.UnixNano() - } + fd.wdeadline.setTime(deadline) if err = fd.connect(ursa); err != nil { closesocket(s) return nil, err } fd.isConnected = true - fd.wdeadline = 0 + fd.wdeadline.set(0) } lsa, _ := syscall.Getsockname(s) diff --git a/src/pkg/net/sockopt_posix.go b/src/pkg/net/sockopt_posix.go index b139c42765..fe371fe0ce 100644 --- a/src/pkg/net/sockopt_posix.go +++ b/src/pkg/net/sockopt_posix.go @@ -119,29 +119,22 @@ func setWriteBuffer(fd *netFD, bytes int) error { return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes)) } +// TODO(dfc) these unused error returns could be removed + func setReadDeadline(fd *netFD, t time.Time) error { - if t.IsZero() { - fd.rdeadline = 0 - } else { - fd.rdeadline = t.UnixNano() - } + fd.rdeadline.setTime(t) return nil } func setWriteDeadline(fd *netFD, t time.Time) error { - if t.IsZero() { - fd.wdeadline = 0 - } else { - fd.wdeadline = t.UnixNano() - } + fd.wdeadline.setTime(t) return nil } func setDeadline(fd *netFD, t time.Time) error { - if err := setReadDeadline(fd, t); err != nil { - return err - } - return setWriteDeadline(fd, t) + setReadDeadline(fd, t) + setWriteDeadline(fd, t) + return nil } func setKeepAlive(fd *netFD, keepalive bool) error { -- 2.48.1