This CL adds minimal support of Happy Eyeballs-like TCP connection
setup to Dialer API. Happy Eyeballs and derivation techniques are
described in the following:
- Happy Eyeballs: Success with Dual-Stack Hosts
http://tools.ietf.org/html/rfc6555
- Analysing Dual Stack Behaviour and IPv6 Quality
http://www.potaroo.net/presentations/2012-04-17-dual-stack-quality.pdf
Usually, the techniques consist of three components below.
- DNS query racers, that run A and AAAA queries in parallel or series
- A short list of destination addresses
- TCP SYN racers, that run IPv4 and IPv6 transport in parallel or series
This CL implements only the latter two. The existing DNS query
component gathers together A and AAAA records in series, so we don't
touch it here. This CL just uses extended resolveInternetAddr and makes
it possible to run multiple Dial racers in parallel.
For example, when the given destination is a DNS name and the name has
multiple address family A and AAAA records, and it happens on the TCP
wildcard network "tcp" with DualStack=true like the following:
(&net.Dialer{DualStack: true}).Dial("tcp", "www.example.com:80")
The function will return a first established connection either TCP over
IPv4 or TCP over IPv6, and close the other connection internally.
Fixes #3610.
Fixes #5267.
Benchmark results on freebsd/amd64 virtual machine, tip vs. tip+
12416043:
benchmark old ns/op new ns/op delta
BenchmarkTCP4OneShot 50696 52141 +2.85%
BenchmarkTCP4OneShotTimeout 65775 66426 +0.99%
BenchmarkTCP4Persistent 10986 10457 -4.82%
BenchmarkTCP4PersistentTimeout 11207 10445 -6.80%
BenchmarkTCP6OneShot 62009 63718 +2.76%
BenchmarkTCP6OneShotTimeout 78351 79138 +1.00%
BenchmarkTCP6Persistent 14695 14659 -0.24%
BenchmarkTCP6PersistentTimeout 15032 14646 -2.57%
BenchmarkTCP4ConcurrentReadWrite 7215 6217 -13.83%
BenchmarkTCP6ConcurrentReadWrite 7528 7493 -0.46%
benchmark old allocs new allocs delta
BenchmarkTCP4OneShot 36 36 0.00%
BenchmarkTCP4OneShotTimeout 36 36 0.00%
BenchmarkTCP4Persistent 0 0 n/a%
BenchmarkTCP4PersistentTimeout 0 0 n/a%
BenchmarkTCP6OneShot 37 37 0.00%
BenchmarkTCP6OneShotTimeout 37 37 0.00%
BenchmarkTCP6Persistent 0 0 n/a%
BenchmarkTCP6PersistentTimeout 0 0 n/a%
BenchmarkTCP4ConcurrentReadWrite 0 0 n/a%
BenchmarkTCP6ConcurrentReadWrite 0 0 n/a%
benchmark old bytes new bytes delta
BenchmarkTCP4OneShot 2500 2503 0.12%
BenchmarkTCP4OneShotTimeout 2508 2505 -0.12%
BenchmarkTCP4Persistent 0 0 n/a%
BenchmarkTCP4PersistentTimeout 0 0 n/a%
BenchmarkTCP6OneShot 2713 2707 -0.22%
BenchmarkTCP6OneShotTimeout 2722 2720 -0.07%
BenchmarkTCP6Persistent 0 0 n/a%
BenchmarkTCP6PersistentTimeout 0 0 n/a%
BenchmarkTCP4ConcurrentReadWrite 0 0 n/a%
BenchmarkTCP6ConcurrentReadWrite 0 0 n/a%
R=golang-dev, bradfitz, nightlyone, rsc
CC=golang-dev
https://golang.org/cl/
12416043
// network being dialed.
// If nil, a local address is automatically chosen.
LocalAddr Addr
+
+ // DualStack allows a single dial to attempt to establish
+ // multiple IPv4 and IPv6 connections and to return the first
+ // established connection when the network is "tcp" and the
+ // destination is a host name that has multiple address family
+ // DNS records.
+ DualStack bool
}
// Return either now+Timeout or Deadline, whichever comes first.
// See func Dial for a description of the network and address
// parameters.
func (d *Dialer) Dial(network, address string) (Conn, error) {
- return resolveAndDial(network, address, d.LocalAddr, d.deadline())
+ ra, err := resolveAddr("dial", network, address, d.deadline())
+ if err != nil {
+ return nil, &OpError{Op: "dial", Net: network, Addr: nil, Err: err}
+ }
+ dialer := func(deadline time.Time) (Conn, error) {
+ return dialSingle(network, address, d.LocalAddr, ra.toAddr(), deadline)
+ }
+ if ras, ok := ra.(addrList); ok && d.DualStack && network == "tcp" {
+ dialer = func(deadline time.Time) (Conn, error) {
+ return dialMulti(network, address, d.LocalAddr, ras, deadline)
+ }
+ }
+ return dial(network, ra.toAddr(), dialer, d.deadline())
}
-func dial(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
+// dialMulti attempts to establish connections to each destination of
+// the list of addresses. It will return the first established
+// connection and close the other connections. Otherwise it returns
+// error on the last attempt.
+func dialMulti(net, addr string, la Addr, ras addrList, deadline time.Time) (Conn, error) {
+ type racer struct {
+ Conn
+ Addr
+ error
+ }
+ // Sig controls the flow of dial results on lane. It passes a
+ // token to the next racer and also indicates the end of flow
+ // by using closed channel.
+ sig := make(chan bool, 1)
+ lane := make(chan racer, 1)
+ for _, ra := range ras {
+ go func(ra Addr) {
+ c, err := dialSingle(net, addr, la, ra, deadline)
+ if _, ok := <-sig; ok {
+ lane <- racer{c, ra, err}
+ } else if err == nil {
+ // We have to return the resources
+ // that belong to the other
+ // connections here for avoiding
+ // unnecessary resource starvation.
+ c.Close()
+ }
+ }(ra.toAddr())
+ }
+ defer close(sig)
+ var failAddr Addr
+ lastErr := errTimeout
+ nracers := len(ras)
+ for nracers > 0 {
+ sig <- true
+ select {
+ case racer := <-lane:
+ if racer.error == nil {
+ return racer.Conn, nil
+ }
+ failAddr = racer.Addr
+ lastErr = racer.error
+ nracers--
+ }
+ }
+ return nil, &OpError{Op: "dial", Net: net, Addr: failAddr, Err: lastErr}
+}
+
+// dialSingle attempts to establish and returns a single connection to
+// the destination address.
+func dialSingle(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
if la != nil && la.Network() != ra.Network() {
return nil, &OpError{Op: "dial", Net: net, Addr: ra, Err: errors.New("mismatched local address type " + la.Network())}
}
}
}
-type stringAddr struct {
- net, addr string
-}
-
-func (a stringAddr) Network() string { return a.net }
-func (a stringAddr) String() string { return a.addr }
-
// Listen announces on the local network address laddr.
// The network net must be a stream-oriented network: "tcp", "tcp4",
// "tcp6", "unix" or "unixpacket".
var testingIssue5349 bool // used during tests
-// resolveAndDialChannel is the simple pure-Go implementation of
-// resolveAndDial, still used on operating systems where the deadline
-// hasn't been pushed down into the pollserver. (Plan 9 and some old
-// versions of Windows)
-func resolveAndDialChannel(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
+// dialChannel is the simple pure-Go implementation of dial, still
+// used on operating systems where the deadline hasn't been pushed
+// down into the pollserver. (Plan 9 and some old versions of Windows)
+func dialChannel(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
var timeout time.Duration
if !deadline.IsZero() {
timeout = deadline.Sub(time.Now())
}
if timeout <= 0 {
- ra, err := resolveAddr("dial", net, addr, noDeadline)
- if err != nil {
- return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
- }
- return dial(net, addr, localAddr, ra.toAddr(), noDeadline)
+ return dialer(noDeadline)
}
t := time.NewTimer(timeout)
defer t.Stop()
- type pair struct {
+ type racer struct {
Conn
error
}
- ch := make(chan pair, 1)
- resolvedAddr := make(chan Addr, 1)
+ ch := make(chan racer, 1)
go func() {
if testingIssue5349 {
time.Sleep(time.Millisecond)
}
- ra, err := resolveAddr("dial", net, addr, noDeadline)
- if err != nil {
- ch <- pair{nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}}
- return
- }
- resolvedAddr <- ra.toAddr() // in case we need it for OpError
- c, err := dial(net, addr, localAddr, ra.toAddr(), noDeadline)
- ch <- pair{c, err}
+ c, err := dialer(noDeadline)
+ ch <- racer{c, err}
}()
select {
case <-t.C:
- // Try to use the real Addr in our OpError, if we resolved it
- // before the timeout. Otherwise we just use stringAddr.
- var ra Addr
- select {
- case a := <-resolvedAddr:
- ra = a
- default:
- ra = &stringAddr{net, addr}
- }
- err := &OpError{
- Op: "dial",
- Net: net,
- Addr: ra,
- Err: errTimeout,
- }
- return nil, err
- case p := <-ch:
- return p.Conn, p.error
+ return nil, &OpError{Op: "dial", Net: net, Addr: ra, Err: errTimeout}
+ case racer := <-ch:
+ return racer.Conn, racer.error
}
}
package net
import (
+ "bytes"
"flag"
"fmt"
"io"
"os"
+ "os/exec"
"reflect"
"regexp"
"runtime"
+ "strconv"
+ "sync"
"testing"
"time"
)
}
}
+func numTCP() (ntcp, nopen, nclose int, err error) {
+ lsof, err := exec.Command("lsof", "-n", "-p", strconv.Itoa(os.Getpid())).Output()
+ if err != nil {
+ return 0, 0, 0, err
+ }
+ ntcp += bytes.Count(lsof, []byte("TCP"))
+ for _, state := range []string{"LISTEN", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED"} {
+ nopen += bytes.Count(lsof, []byte(state))
+ }
+ for _, state := range []string{"CLOSED", "CLOSE_WAIT", "LAST_ACK", "FIN_WAIT_1", "FIN_WAIT_2", "CLOSING", "TIME_WAIT"} {
+ nclose += bytes.Count(lsof, []byte(state))
+ }
+ return ntcp, nopen, nclose, nil
+}
+
+func TestDialMultiFDLeak(t *testing.T) {
+ if !supportsIPv4 || !supportsIPv6 {
+ t.Skip("neither ipv4 nor ipv6 is supported")
+ }
+
+ halfDeadServer := func(dss *dualStackServer, ln Listener) {
+ for {
+ if c, err := ln.Accept(); err != nil {
+ return
+ } else {
+ // It just keeps established
+ // connections like a half-dead server
+ // does.
+ dss.putConn(c)
+ }
+ }
+ }
+ dss, err := newDualStackServer([]streamListener{
+ {net: "tcp4", addr: "127.0.0.1"},
+ {net: "tcp6", addr: "[::1]"},
+ })
+ if err != nil {
+ t.Fatalf("newDualStackServer failed: %v", err)
+ }
+ defer dss.teardown()
+ if err := dss.buildup(halfDeadServer); err != nil {
+ t.Fatalf("dualStackServer.buildup failed: %v", err)
+ }
+
+ _, before, _, err := numTCP()
+ if err != nil {
+ t.Skipf("skipping test; error finding or running lsof: %v", err)
+ }
+
+ var wg sync.WaitGroup
+ portnum, _, _ := dtoi(dss.port, 0)
+ ras := addrList{
+ // Losers that will fail to connect, see RFC 6890.
+ &TCPAddr{IP: IPv4(198, 18, 0, 254), Port: portnum},
+ &TCPAddr{IP: ParseIP("2001:2::254"), Port: portnum},
+
+ // Winner candidates of this race.
+ &TCPAddr{IP: IPv4(127, 0, 0, 1), Port: portnum},
+ &TCPAddr{IP: IPv6loopback, Port: portnum},
+
+ // Losers that will have established connections.
+ &TCPAddr{IP: IPv4(127, 0, 0, 1), Port: portnum},
+ &TCPAddr{IP: IPv6loopback, Port: portnum},
+ }
+ const T1 = 10 * time.Millisecond
+ const T2 = 2 * T1
+ const N = 10
+ for i := 0; i < N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if c, err := dialMulti("tcp", "fast failover test", nil, ras, time.Now().Add(T1)); err == nil {
+ c.Close()
+ }
+ }()
+ }
+ wg.Wait()
+ time.Sleep(T2)
+
+ ntcp, after, nclose, err := numTCP()
+ if err != nil {
+ t.Skipf("skipping test; error finding or running lsof: %v", err)
+ }
+ t.Logf("tcp sessions: %v, open sessions: %v, closing sessions: %v", ntcp, after, nclose)
+
+ if after != before {
+ t.Fatalf("got %v open sessions; expected %v", after, before)
+ }
+}
+
func numFD() int {
if runtime.GOOS == "linux" {
f, err := os.Open("/proc/self/fd")
t.Error(err)
}
}
+
+func TestDialDualStackLocalhost(t *testing.T) {
+ if ips, err := LookupIP("localhost"); err != nil {
+ t.Fatalf("LookupIP failed: %v", err)
+ } else if len(ips) < 2 || !supportsIPv4 || !supportsIPv6 {
+ t.Skip("localhost doesn't have a pair of different address family IP addresses")
+ }
+
+ touchAndByeServer := func(dss *dualStackServer, ln Listener) {
+ for {
+ if c, err := ln.Accept(); err != nil {
+ return
+ } else {
+ c.Close()
+ }
+ }
+ }
+ dss, err := newDualStackServer([]streamListener{
+ {net: "tcp4", addr: "127.0.0.1"},
+ {net: "tcp6", addr: "[::1]"},
+ })
+ if err != nil {
+ t.Fatalf("newDualStackServer failed: %v", err)
+ }
+ defer dss.teardown()
+ if err := dss.buildup(touchAndByeServer); err != nil {
+ t.Fatalf("dualStackServer.buildup failed: %v", err)
+ }
+
+ d := &Dialer{DualStack: true}
+ for _ = range dss.lns {
+ if c, err := d.Dial("tcp", "localhost:"+dss.port); err != nil {
+ t.Errorf("Dial failed: %v", err)
+ } else {
+ if addr := c.LocalAddr().(*TCPAddr); addr.IP.To4() != nil {
+ dss.teardownNetwork("tcp4")
+ } else if addr.IP.To16() != nil && addr.IP.To4() == nil {
+ dss.teardownNetwork("tcp6")
+ }
+ c.Close()
+ }
+ }
+}
}
}
+func TestDialGoogle(t *testing.T) {
+ if testing.Short() || !*testExternal {
+ t.Skip("skipping test to avoid external network")
+ }
+
+ d := &Dialer{DualStack: true}
+ for _, network := range []string{"tcp", "tcp4", "tcp6"} {
+ if network == "tcp" && !supportsIPv4 && !supportsIPv6 {
+ t.Logf("skipping test; both ipv4 and ipv6 are not supported")
+ continue
+ } else if network == "tcp4" && !supportsIPv4 {
+ t.Logf("skipping test; ipv4 is not supported")
+ continue
+ } else if network == "tcp6" && !supportsIPv6 {
+ t.Logf("skipping test; ipv6 is not supported")
+ continue
+ } else if network == "tcp6" && !*testIPv6 {
+ t.Logf("test disabled; use -ipv6 to enable")
+ continue
+ }
+ if c, err := d.Dial(network, "www.google.com:http"); err != nil {
+ t.Errorf("Dial failed: %v", err)
+ } else {
+ c.Close()
+ }
+ }
+}
+
// fd is already connected to the destination, port 80.
// Run an HTTP request to fetch the appropriate page.
func fetchGoogle(t *testing.T, fd Conn, network, addr string) {
func sysInit() {
}
-func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
+func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
// On plan9, use the relatively inefficient
// goroutine-racing implementation.
- return resolveAndDialChannel(net, addr, localAddr, deadline)
+ return dialChannel(net, ra, dialer, deadline)
}
func newFD(proto, name string, ctl, data *os.File, laddr, raddr Addr) *netFD {
func sysInit() {
}
-func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
- ra, err := resolveAddr("dial", net, addr, deadline)
- if err != nil {
- return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
- }
- return dial(net, addr, localAddr, ra.toAddr(), deadline)
+func dial(network string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
+ return dialer(deadline)
}
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
return syscall.LoadConnectEx() == nil
}
-func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
+func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
if !canUseConnectEx(net) {
// Use the relatively inefficient goroutine-racing
// implementation of DialTimeout.
- return resolveAndDialChannel(net, addr, localAddr, deadline)
+ return dialChannel(net, ra, dialer, deadline)
}
- ra, err := resolveAddr("dial", net, addr, deadline)
- if err != nil {
- return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
- }
- return dial(net, addr, localAddr, ra.toAddr(), deadline)
+ return dialer(deadline)
}
// operation contains superset of data necessary to perform all async IO.
--- /dev/null
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import "sync"
+
+type streamListener struct {
+ net, addr string
+ ln Listener
+}
+
+type dualStackServer struct {
+ lnmu sync.RWMutex
+ lns []streamListener
+ port string
+
+ cmu sync.RWMutex
+ cs []Conn // established connections at the passive open side
+}
+
+func (dss *dualStackServer) buildup(server func(*dualStackServer, Listener)) error {
+ for i := range dss.lns {
+ go server(dss, dss.lns[i].ln)
+ }
+ return nil
+}
+
+func (dss *dualStackServer) putConn(c Conn) error {
+ dss.cmu.Lock()
+ dss.cs = append(dss.cs, c)
+ dss.cmu.Unlock()
+ return nil
+}
+
+func (dss *dualStackServer) teardownNetwork(net string) error {
+ dss.lnmu.Lock()
+ for i := range dss.lns {
+ if net == dss.lns[i].net && dss.lns[i].ln != nil {
+ dss.lns[i].ln.Close()
+ dss.lns[i].ln = nil
+ }
+ }
+ dss.lnmu.Unlock()
+ return nil
+}
+
+func (dss *dualStackServer) teardown() error {
+ dss.lnmu.Lock()
+ for i := range dss.lns {
+ if dss.lns[i].ln != nil {
+ dss.lns[i].ln.Close()
+ }
+ }
+ dss.lnmu.Unlock()
+ dss.cmu.Lock()
+ for _, c := range dss.cs {
+ c.Close()
+ }
+ dss.cmu.Unlock()
+ return nil
+}
+
+func newDualStackServer(lns []streamListener) (*dualStackServer, error) {
+ dss := &dualStackServer{lns: lns, port: "0"}
+ for i := range dss.lns {
+ ln, err := Listen(dss.lns[i].net, dss.lns[i].addr+":"+dss.port)
+ if err != nil {
+ dss.teardown()
+ return nil, err
+ }
+ dss.lns[i].ln = ln
+ if dss.port == "0" {
+ if _, dss.port, err = SplitHostPort(ln.Addr().String()); err != nil {
+ dss.teardown()
+ return nil, err
+ }
+ }
+ }
+ return dss, nil
+}