]> Cypherpunks repositories - gostls13.git/commitdiff
net: implement TCP connection setup with fast failover
authorMikio Hara <mikioh.mikioh@gmail.com>
Wed, 11 Sep 2013 14:48:53 +0000 (10:48 -0400)
committerRuss Cox <rsc@golang.org>
Wed, 11 Sep 2013 14:48:53 +0000 (10:48 -0400)
This CL adds minimal support of Happy Eyeballs-like TCP connection
setup to Dialer API. Happy Eyeballs and derivation techniques are
described in the following:

- Happy Eyeballs: Success with Dual-Stack Hosts
  http://tools.ietf.org/html/rfc6555

- Analysing Dual Stack Behaviour and IPv6 Quality
  http://www.potaroo.net/presentations/2012-04-17-dual-stack-quality.pdf

Usually, the techniques consist of three components below.

- DNS query racers, that run A and AAAA queries in parallel or series
- A short list of destination addresses
- TCP SYN racers, that run IPv4 and IPv6 transport in parallel or series

This CL implements only the latter two. The existing DNS query
component gathers together A and AAAA records in series, so we don't
touch it here. This CL just uses extended resolveInternetAddr and makes
it possible to run multiple Dial racers in parallel.

For example, when the given destination is a DNS name and the name has
multiple address family A and AAAA records, and it happens on the TCP
wildcard network "tcp" with DualStack=true like the following:

(&net.Dialer{DualStack: true}).Dial("tcp", "www.example.com:80")

The function will return a first established connection either TCP over
IPv4 or TCP over IPv6, and close the other connection internally.

Fixes #3610.
Fixes #5267.

Benchmark results on freebsd/amd64 virtual machine, tip vs. tip+12416043:

benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4OneShot                    50696        52141   +2.85%
BenchmarkTCP4OneShotTimeout             65775        66426   +0.99%
BenchmarkTCP4Persistent                 10986        10457   -4.82%
BenchmarkTCP4PersistentTimeout          11207        10445   -6.80%
BenchmarkTCP6OneShot                    62009        63718   +2.76%
BenchmarkTCP6OneShotTimeout             78351        79138   +1.00%
BenchmarkTCP6Persistent                 14695        14659   -0.24%
BenchmarkTCP6PersistentTimeout          15032        14646   -2.57%
BenchmarkTCP4ConcurrentReadWrite         7215         6217  -13.83%
BenchmarkTCP6ConcurrentReadWrite         7528         7493   -0.46%

benchmark                          old allocs   new allocs    delta
BenchmarkTCP4OneShot                       36           36    0.00%
BenchmarkTCP4OneShotTimeout                36           36    0.00%
BenchmarkTCP4Persistent                     0            0     n/a%
BenchmarkTCP4PersistentTimeout              0            0     n/a%
BenchmarkTCP6OneShot                       37           37    0.00%
BenchmarkTCP6OneShotTimeout                37           37    0.00%
BenchmarkTCP6Persistent                     0            0     n/a%
BenchmarkTCP6PersistentTimeout              0            0     n/a%
BenchmarkTCP4ConcurrentReadWrite            0            0     n/a%
BenchmarkTCP6ConcurrentReadWrite            0            0     n/a%

benchmark                           old bytes    new bytes    delta
BenchmarkTCP4OneShot                     2500         2503    0.12%
BenchmarkTCP4OneShotTimeout              2508         2505   -0.12%
BenchmarkTCP4Persistent                     0            0     n/a%
BenchmarkTCP4PersistentTimeout              0            0     n/a%
BenchmarkTCP6OneShot                     2713         2707   -0.22%
BenchmarkTCP6OneShotTimeout              2722         2720   -0.07%
BenchmarkTCP6Persistent                     0            0     n/a%
BenchmarkTCP6PersistentTimeout              0            0     n/a%
BenchmarkTCP4ConcurrentReadWrite            0            0     n/a%
BenchmarkTCP6ConcurrentReadWrite            0            0     n/a%

R=golang-dev, bradfitz, nightlyone, rsc
CC=golang-dev
https://golang.org/cl/12416043

src/pkg/net/dial.go
src/pkg/net/dial_gen.go
src/pkg/net/dial_test.go
src/pkg/net/dialgoogle_test.go
src/pkg/net/fd_plan9.go
src/pkg/net/fd_unix.go
src/pkg/net/fd_windows.go
src/pkg/net/mockserver_test.go [new file with mode: 0644]

index f0f47b215527a66b151e3aa640836a34a0ff37b7..fb47795d79f7c8396e43383ae087b675e37b4ef9 100644 (file)
@@ -37,6 +37,13 @@ type Dialer struct {
        // network being dialed.
        // If nil, a local address is automatically chosen.
        LocalAddr Addr
+
+       // DualStack allows a single dial to attempt to establish
+       // multiple IPv4 and IPv6 connections and to return the first
+       // established connection when the network is "tcp" and the
+       // destination is a host name that has multiple address family
+       // DNS records.
+       DualStack bool
 }
 
 // Return either now+Timeout or Deadline, whichever comes first.
@@ -143,10 +150,72 @@ func DialTimeout(network, address string, timeout time.Duration) (Conn, error) {
 // See func Dial for a description of the network and address
 // parameters.
 func (d *Dialer) Dial(network, address string) (Conn, error) {
-       return resolveAndDial(network, address, d.LocalAddr, d.deadline())
+       ra, err := resolveAddr("dial", network, address, d.deadline())
+       if err != nil {
+               return nil, &OpError{Op: "dial", Net: network, Addr: nil, Err: err}
+       }
+       dialer := func(deadline time.Time) (Conn, error) {
+               return dialSingle(network, address, d.LocalAddr, ra.toAddr(), deadline)
+       }
+       if ras, ok := ra.(addrList); ok && d.DualStack && network == "tcp" {
+               dialer = func(deadline time.Time) (Conn, error) {
+                       return dialMulti(network, address, d.LocalAddr, ras, deadline)
+               }
+       }
+       return dial(network, ra.toAddr(), dialer, d.deadline())
 }
 
-func dial(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
+// dialMulti attempts to establish connections to each destination of
+// the list of addresses. It will return the first established
+// connection and close the other connections. Otherwise it returns
+// error on the last attempt.
+func dialMulti(net, addr string, la Addr, ras addrList, deadline time.Time) (Conn, error) {
+       type racer struct {
+               Conn
+               Addr
+               error
+       }
+       // Sig controls the flow of dial results on lane. It passes a
+       // token to the next racer and also indicates the end of flow
+       // by using closed channel.
+       sig := make(chan bool, 1)
+       lane := make(chan racer, 1)
+       for _, ra := range ras {
+               go func(ra Addr) {
+                       c, err := dialSingle(net, addr, la, ra, deadline)
+                       if _, ok := <-sig; ok {
+                               lane <- racer{c, ra, err}
+                       } else if err == nil {
+                               // We have to return the resources
+                               // that belong to the other
+                               // connections here for avoiding
+                               // unnecessary resource starvation.
+                               c.Close()
+                       }
+               }(ra.toAddr())
+       }
+       defer close(sig)
+       var failAddr Addr
+       lastErr := errTimeout
+       nracers := len(ras)
+       for nracers > 0 {
+               sig <- true
+               select {
+               case racer := <-lane:
+                       if racer.error == nil {
+                               return racer.Conn, nil
+                       }
+                       failAddr = racer.Addr
+                       lastErr = racer.error
+                       nracers--
+               }
+       }
+       return nil, &OpError{Op: "dial", Net: net, Addr: failAddr, Err: lastErr}
+}
+
+// dialSingle attempts to establish and returns a single connection to
+// the destination address.
+func dialSingle(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
        if la != nil && la.Network() != ra.Network() {
                return nil, &OpError{Op: "dial", Net: net, Addr: ra, Err: errors.New("mismatched local address type " + la.Network())}
        }
@@ -168,13 +237,6 @@ func dial(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
        }
 }
 
-type stringAddr struct {
-       net, addr string
-}
-
-func (a stringAddr) Network() string { return a.net }
-func (a stringAddr) String() string  { return a.addr }
-
 // Listen announces on the local network address laddr.
 // The network net must be a stream-oriented network: "tcp", "tcp4",
 // "tcp6", "unix" or "unixpacket".
index f051cdaa844653ed8eaa816ddd62ae6acaccc20a..ada6233003fececed91924146c274f5d2661a6ed 100644 (file)
@@ -12,62 +12,35 @@ import (
 
 var testingIssue5349 bool // used during tests
 
-// resolveAndDialChannel is the simple pure-Go implementation of
-// resolveAndDial, still used on operating systems where the deadline
-// hasn't been pushed down into the pollserver. (Plan 9 and some old
-// versions of Windows)
-func resolveAndDialChannel(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
+// dialChannel is the simple pure-Go implementation of dial, still
+// used on operating systems where the deadline hasn't been pushed
+// down into the pollserver. (Plan 9 and some old versions of Windows)
+func dialChannel(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
        var timeout time.Duration
        if !deadline.IsZero() {
                timeout = deadline.Sub(time.Now())
        }
        if timeout <= 0 {
-               ra, err := resolveAddr("dial", net, addr, noDeadline)
-               if err != nil {
-                       return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
-               }
-               return dial(net, addr, localAddr, ra.toAddr(), noDeadline)
+               return dialer(noDeadline)
        }
        t := time.NewTimer(timeout)
        defer t.Stop()
-       type pair struct {
+       type racer struct {
                Conn
                error
        }
-       ch := make(chan pair, 1)
-       resolvedAddr := make(chan Addr, 1)
+       ch := make(chan racer, 1)
        go func() {
                if testingIssue5349 {
                        time.Sleep(time.Millisecond)
                }
-               ra, err := resolveAddr("dial", net, addr, noDeadline)
-               if err != nil {
-                       ch <- pair{nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}}
-                       return
-               }
-               resolvedAddr <- ra.toAddr() // in case we need it for OpError
-               c, err := dial(net, addr, localAddr, ra.toAddr(), noDeadline)
-               ch <- pair{c, err}
+               c, err := dialer(noDeadline)
+               ch <- racer{c, err}
        }()
        select {
        case <-t.C:
-               // Try to use the real Addr in our OpError, if we resolved it
-               // before the timeout. Otherwise we just use stringAddr.
-               var ra Addr
-               select {
-               case a := <-resolvedAddr:
-                       ra = a
-               default:
-                       ra = &stringAddr{net, addr}
-               }
-               err := &OpError{
-                       Op:   "dial",
-                       Net:  net,
-                       Addr: ra,
-                       Err:  errTimeout,
-               }
-               return nil, err
-       case p := <-ch:
-               return p.Conn, p.error
+               return nil, &OpError{Op: "dial", Net: net, Addr: ra, Err: errTimeout}
+       case racer := <-ch:
+               return racer.Conn, racer.error
        }
 }
index d79c8a536f92f07939a74e39b21543af909412c5..74391bbde76430c10a15180bfae755cfde81edf8 100644 (file)
@@ -5,13 +5,17 @@
 package net
 
 import (
+       "bytes"
        "flag"
        "fmt"
        "io"
        "os"
+       "os/exec"
        "reflect"
        "regexp"
        "runtime"
+       "strconv"
+       "sync"
        "testing"
        "time"
 )
@@ -314,6 +318,96 @@ func TestDialTimeoutFDLeak(t *testing.T) {
        }
 }
 
+func numTCP() (ntcp, nopen, nclose int, err error) {
+       lsof, err := exec.Command("lsof", "-n", "-p", strconv.Itoa(os.Getpid())).Output()
+       if err != nil {
+               return 0, 0, 0, err
+       }
+       ntcp += bytes.Count(lsof, []byte("TCP"))
+       for _, state := range []string{"LISTEN", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED"} {
+               nopen += bytes.Count(lsof, []byte(state))
+       }
+       for _, state := range []string{"CLOSED", "CLOSE_WAIT", "LAST_ACK", "FIN_WAIT_1", "FIN_WAIT_2", "CLOSING", "TIME_WAIT"} {
+               nclose += bytes.Count(lsof, []byte(state))
+       }
+       return ntcp, nopen, nclose, nil
+}
+
+func TestDialMultiFDLeak(t *testing.T) {
+       if !supportsIPv4 || !supportsIPv6 {
+               t.Skip("neither ipv4 nor ipv6 is supported")
+       }
+
+       halfDeadServer := func(dss *dualStackServer, ln Listener) {
+               for {
+                       if c, err := ln.Accept(); err != nil {
+                               return
+                       } else {
+                               // It just keeps established
+                               // connections like a half-dead server
+                               // does.
+                               dss.putConn(c)
+                       }
+               }
+       }
+       dss, err := newDualStackServer([]streamListener{
+               {net: "tcp4", addr: "127.0.0.1"},
+               {net: "tcp6", addr: "[::1]"},
+       })
+       if err != nil {
+               t.Fatalf("newDualStackServer failed: %v", err)
+       }
+       defer dss.teardown()
+       if err := dss.buildup(halfDeadServer); err != nil {
+               t.Fatalf("dualStackServer.buildup failed: %v", err)
+       }
+
+       _, before, _, err := numTCP()
+       if err != nil {
+               t.Skipf("skipping test; error finding or running lsof: %v", err)
+       }
+
+       var wg sync.WaitGroup
+       portnum, _, _ := dtoi(dss.port, 0)
+       ras := addrList{
+               // Losers that will fail to connect, see RFC 6890.
+               &TCPAddr{IP: IPv4(198, 18, 0, 254), Port: portnum},
+               &TCPAddr{IP: ParseIP("2001:2::254"), Port: portnum},
+
+               // Winner candidates of this race.
+               &TCPAddr{IP: IPv4(127, 0, 0, 1), Port: portnum},
+               &TCPAddr{IP: IPv6loopback, Port: portnum},
+
+               // Losers that will have established connections.
+               &TCPAddr{IP: IPv4(127, 0, 0, 1), Port: portnum},
+               &TCPAddr{IP: IPv6loopback, Port: portnum},
+       }
+       const T1 = 10 * time.Millisecond
+       const T2 = 2 * T1
+       const N = 10
+       for i := 0; i < N; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       if c, err := dialMulti("tcp", "fast failover test", nil, ras, time.Now().Add(T1)); err == nil {
+                               c.Close()
+                       }
+               }()
+       }
+       wg.Wait()
+       time.Sleep(T2)
+
+       ntcp, after, nclose, err := numTCP()
+       if err != nil {
+               t.Skipf("skipping test; error finding or running lsof: %v", err)
+       }
+       t.Logf("tcp sessions: %v, open sessions: %v, closing sessions: %v", ntcp, after, nclose)
+
+       if after != before {
+               t.Fatalf("got %v open sessions; expected %v", after, before)
+       }
+}
+
 func numFD() int {
        if runtime.GOOS == "linux" {
                f, err := os.Open("/proc/self/fd")
@@ -404,3 +498,46 @@ func TestDialer(t *testing.T) {
                t.Error(err)
        }
 }
+
+func TestDialDualStackLocalhost(t *testing.T) {
+       if ips, err := LookupIP("localhost"); err != nil {
+               t.Fatalf("LookupIP failed: %v", err)
+       } else if len(ips) < 2 || !supportsIPv4 || !supportsIPv6 {
+               t.Skip("localhost doesn't have a pair of different address family IP addresses")
+       }
+
+       touchAndByeServer := func(dss *dualStackServer, ln Listener) {
+               for {
+                       if c, err := ln.Accept(); err != nil {
+                               return
+                       } else {
+                               c.Close()
+                       }
+               }
+       }
+       dss, err := newDualStackServer([]streamListener{
+               {net: "tcp4", addr: "127.0.0.1"},
+               {net: "tcp6", addr: "[::1]"},
+       })
+       if err != nil {
+               t.Fatalf("newDualStackServer failed: %v", err)
+       }
+       defer dss.teardown()
+       if err := dss.buildup(touchAndByeServer); err != nil {
+               t.Fatalf("dualStackServer.buildup failed: %v", err)
+       }
+
+       d := &Dialer{DualStack: true}
+       for _ = range dss.lns {
+               if c, err := d.Dial("tcp", "localhost:"+dss.port); err != nil {
+                       t.Errorf("Dial failed: %v", err)
+               } else {
+                       if addr := c.LocalAddr().(*TCPAddr); addr.IP.To4() != nil {
+                               dss.teardownNetwork("tcp4")
+                       } else if addr.IP.To16() != nil && addr.IP.To4() == nil {
+                               dss.teardownNetwork("tcp6")
+                       }
+                       c.Close()
+               }
+       }
+}
index 000e1c323a3c57eede19d2a61f3675d1b5c359ea..b4ebad0e0dc8733920ab31d3dcaaeb435b0bb2da 100644 (file)
@@ -41,6 +41,34 @@ func TestResolveGoogle(t *testing.T) {
        }
 }
 
+func TestDialGoogle(t *testing.T) {
+       if testing.Short() || !*testExternal {
+               t.Skip("skipping test to avoid external network")
+       }
+
+       d := &Dialer{DualStack: true}
+       for _, network := range []string{"tcp", "tcp4", "tcp6"} {
+               if network == "tcp" && !supportsIPv4 && !supportsIPv6 {
+                       t.Logf("skipping test; both ipv4 and ipv6 are not supported")
+                       continue
+               } else if network == "tcp4" && !supportsIPv4 {
+                       t.Logf("skipping test; ipv4 is not supported")
+                       continue
+               } else if network == "tcp6" && !supportsIPv6 {
+                       t.Logf("skipping test; ipv6 is not supported")
+                       continue
+               } else if network == "tcp6" && !*testIPv6 {
+                       t.Logf("test disabled; use -ipv6 to enable")
+                       continue
+               }
+               if c, err := d.Dial(network, "www.google.com:http"); err != nil {
+                       t.Errorf("Dial failed: %v", err)
+               } else {
+                       c.Close()
+               }
+       }
+}
+
 // fd is already connected to the destination, port 80.
 // Run an HTTP request to fetch the appropriate page.
 func fetchGoogle(t *testing.T, fd Conn, network, addr string) {
index 0d9dc54408680b38f33599f1189f3146185cad47..38515f20e3dc03cbd66ab8716ba35f37f258a122 100644 (file)
@@ -21,10 +21,10 @@ type netFD struct {
 func sysInit() {
 }
 
-func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
+func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
        // On plan9, use the relatively inefficient
        // goroutine-racing implementation.
-       return resolveAndDialChannel(net, addr, localAddr, deadline)
+       return dialChannel(net, ra, dialer, deadline)
 }
 
 func newFD(proto, name string, ctl, data *os.File, laddr, raddr Addr) *netFD {
index 457c1d18e28bca9b08e3596dfbff15df55dbddaa..2e62ba0ec47e315e6b3fdc22c5f3b940e5546351 100644 (file)
@@ -36,12 +36,8 @@ type netFD struct {
 func sysInit() {
 }
 
-func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
-       ra, err := resolveAddr("dial", net, addr, deadline)
-       if err != nil {
-               return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
-       }
-       return dial(net, addr, localAddr, ra.toAddr(), deadline)
+func dial(network string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
+       return dialer(deadline)
 }
 
 func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
index 6f344057c7a82e978fbf3144bbc32f500b0aa8b6..d480fb40579308800b8c838d50cf88296f8bdd0e 100644 (file)
@@ -83,17 +83,13 @@ func canUseConnectEx(net string) bool {
        return syscall.LoadConnectEx() == nil
 }
 
-func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
+func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
        if !canUseConnectEx(net) {
                // Use the relatively inefficient goroutine-racing
                // implementation of DialTimeout.
-               return resolveAndDialChannel(net, addr, localAddr, deadline)
+               return dialChannel(net, ra, dialer, deadline)
        }
-       ra, err := resolveAddr("dial", net, addr, deadline)
-       if err != nil {
-               return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
-       }
-       return dial(net, addr, localAddr, ra.toAddr(), deadline)
+       return dialer(deadline)
 }
 
 // operation contains superset of data necessary to perform all async IO.
diff --git a/src/pkg/net/mockserver_test.go b/src/pkg/net/mockserver_test.go
new file mode 100644 (file)
index 0000000..68ded5d
--- /dev/null
@@ -0,0 +1,82 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import "sync"
+
+type streamListener struct {
+       net, addr string
+       ln        Listener
+}
+
+type dualStackServer struct {
+       lnmu sync.RWMutex
+       lns  []streamListener
+       port string
+
+       cmu sync.RWMutex
+       cs  []Conn // established connections at the passive open side
+}
+
+func (dss *dualStackServer) buildup(server func(*dualStackServer, Listener)) error {
+       for i := range dss.lns {
+               go server(dss, dss.lns[i].ln)
+       }
+       return nil
+}
+
+func (dss *dualStackServer) putConn(c Conn) error {
+       dss.cmu.Lock()
+       dss.cs = append(dss.cs, c)
+       dss.cmu.Unlock()
+       return nil
+}
+
+func (dss *dualStackServer) teardownNetwork(net string) error {
+       dss.lnmu.Lock()
+       for i := range dss.lns {
+               if net == dss.lns[i].net && dss.lns[i].ln != nil {
+                       dss.lns[i].ln.Close()
+                       dss.lns[i].ln = nil
+               }
+       }
+       dss.lnmu.Unlock()
+       return nil
+}
+
+func (dss *dualStackServer) teardown() error {
+       dss.lnmu.Lock()
+       for i := range dss.lns {
+               if dss.lns[i].ln != nil {
+                       dss.lns[i].ln.Close()
+               }
+       }
+       dss.lnmu.Unlock()
+       dss.cmu.Lock()
+       for _, c := range dss.cs {
+               c.Close()
+       }
+       dss.cmu.Unlock()
+       return nil
+}
+
+func newDualStackServer(lns []streamListener) (*dualStackServer, error) {
+       dss := &dualStackServer{lns: lns, port: "0"}
+       for i := range dss.lns {
+               ln, err := Listen(dss.lns[i].net, dss.lns[i].addr+":"+dss.port)
+               if err != nil {
+                       dss.teardown()
+                       return nil, err
+               }
+               dss.lns[i].ln = ln
+               if dss.port == "0" {
+                       if _, dss.port, err = SplitHostPort(ln.Addr().String()); err != nil {
+                               dss.teardown()
+                               return nil, err
+                       }
+               }
+       }
+       return dss, nil
+}