]> Cypherpunks repositories - gostls13.git/commitdiff
database/sql: avoid deadlock waiting for connections
authorChris Hines <chris.cs.guy@gmail.com>
Mon, 14 Sep 2015 07:44:56 +0000 (03:44 -0400)
committerBrad Fitzpatrick <bradfitz@golang.org>
Fri, 16 Oct 2015 15:17:03 +0000 (15:17 +0000)
Previously with db.maxOpen > 0, db.maxOpen+n failed connection attempts
started concurrently could result in a deadlock. DB.conn and
DB.openNewConnection did not trigger the DB.connectionOpener go routine
after a failed connection attempt. This omission could leave go routines
waiting for DB.connectionOpener forever.

In addition the logic to track the state of the pool was inconsistent.
db.numOpen was sometimes incremented optimistically and sometimes not.
This change harmonizes the logic and eliminates the db.pendingOpens
variable, making the logic easier to understand and maintain.

Fixes #10886

Change-Id: I983c4921a3dacfbd531c3d7f8d2da8a592e9922a
Reviewed-on: https://go-review.googlesource.com/14547
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/database/sql/fakedb_test.go
src/database/sql/sql.go
src/database/sql/sql_test.go

index 112f280ec5364ab8600ac46df9ac1f549f6c65b0..f1e8f6cb6ed7dff3f17a86ab0e06565eb5b30a56 100644 (file)
@@ -153,12 +153,32 @@ func TestDrivers(t *testing.T) {
        }
 }
 
+// hook to simulate connection failures
+var hookOpenErr struct {
+       sync.Mutex
+       fn func() error
+}
+
+func setHookOpenErr(fn func() error) {
+       hookOpenErr.Lock()
+       defer hookOpenErr.Unlock()
+       hookOpenErr.fn = fn
+}
+
 // Supports dsn forms:
 //    <dbname>
 //    <dbname>;<opts>  (only currently supported option is `badConn`,
 //                      which causes driver.ErrBadConn to be returned on
 //                      every other conn.Begin())
 func (d *fakeDriver) Open(dsn string) (driver.Conn, error) {
+       hookOpenErr.Lock()
+       fn := hookOpenErr.fn
+       hookOpenErr.Unlock()
+       if fn != nil {
+               if err := fn(); err != nil {
+                       return nil, err
+               }
+       }
        parts := strings.Split(dsn, ";")
        if len(parts) < 1 {
                return nil, errors.New("fakedb: no database name")
index fbb0e594a5a6b63abba8e72bc3c1ca1d2e15dc5f..f3fed953ad316214d0bd5aacd4430e09f5be9c62 100644 (file)
@@ -229,8 +229,7 @@ type DB struct {
        mu           sync.Mutex // protects following fields
        freeConn     []*driverConn
        connRequests []chan connRequest
-       numOpen      int
-       pendingOpens int
+       numOpen      int // number of opened and pending open connections
        // Used to signal the need for new connections
        // a goroutine running connectionOpener() reads on this chan and
        // maybeOpenNewConnections sends on the chan (one send per needed connection)
@@ -615,15 +614,15 @@ func (db *DB) Stats() DBStats {
 // If there are connRequests and the connection limit hasn't been reached,
 // then tell the connectionOpener to open new connections.
 func (db *DB) maybeOpenNewConnections() {
-       numRequests := len(db.connRequests) - db.pendingOpens
+       numRequests := len(db.connRequests)
        if db.maxOpen > 0 {
-               numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens)
+               numCanOpen := db.maxOpen - db.numOpen
                if numRequests > numCanOpen {
                        numRequests = numCanOpen
                }
        }
        for numRequests > 0 {
-               db.pendingOpens++
+               db.numOpen++ // optimistically
                numRequests--
                db.openerCh <- struct{}{}
        }
@@ -638,6 +637,9 @@ func (db *DB) connectionOpener() {
 
 // Open one new connection
 func (db *DB) openNewConnection() {
+       // maybeOpenNewConnctions has already executed db.numOpen++ before it sent
+       // on db.openerCh. This function must execute db.numOpen-- if the
+       // connection fails or is closed before returning.
        ci, err := db.driver.Open(db.dsn)
        db.mu.Lock()
        defer db.mu.Unlock()
@@ -645,11 +647,13 @@ func (db *DB) openNewConnection() {
                if err == nil {
                        ci.Close()
                }
+               db.numOpen--
                return
        }
-       db.pendingOpens--
        if err != nil {
+               db.numOpen--
                db.putConnDBLocked(nil, err)
+               db.maybeOpenNewConnections()
                return
        }
        dc := &driverConn{
@@ -658,8 +662,8 @@ func (db *DB) openNewConnection() {
        }
        if db.putConnDBLocked(dc, err) {
                db.addDepLocked(dc, dc)
-               db.numOpen++
        } else {
+               db.numOpen--
                ci.Close()
        }
 }
@@ -701,7 +705,10 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
                req := make(chan connRequest, 1)
                db.connRequests = append(db.connRequests, req)
                db.mu.Unlock()
-               ret := <-req
+               ret, ok := <-req
+               if !ok {
+                       return nil, errDBClosed
+               }
                return ret.conn, ret.err
        }
 
@@ -711,6 +718,7 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
        if err != nil {
                db.mu.Lock()
                db.numOpen-- // correct for earlier optimism
+               db.maybeOpenNewConnections()
                db.mu.Unlock()
                return nil, err
        }
index e1063bbc6b4509ae9c4ecfc15bbb5ceaf093631f..d835bc160a46fed3e52f6970b6eef29b9ad5e4c3 100644 (file)
@@ -1159,6 +1159,67 @@ func TestMaxOpenConnsOnBusy(t *testing.T) {
        }
 }
 
+// Issue 10886: tests that all connection attempts return when more than
+// DB.maxOpen connections are in flight and the first DB.maxOpen fail.
+func TestPendingConnsAfterErr(t *testing.T) {
+       const (
+               maxOpen = 2
+               tryOpen = maxOpen*2 + 2
+       )
+
+       db := newTestDB(t, "people")
+       defer closeDB(t, db)
+       defer func() {
+               for k, v := range db.lastPut {
+                       t.Logf("%p: %v", k, v)
+               }
+       }()
+
+       db.SetMaxOpenConns(maxOpen)
+       db.SetMaxIdleConns(0)
+
+       errOffline := errors.New("db offline")
+       defer func() { setHookOpenErr(nil) }()
+
+       errs := make(chan error, tryOpen)
+
+       unblock := make(chan struct{})
+       setHookOpenErr(func() error {
+               <-unblock // block until all connections are in flight
+               return errOffline
+       })
+
+       var opening sync.WaitGroup
+       opening.Add(tryOpen)
+       for i := 0; i < tryOpen; i++ {
+               go func() {
+                       opening.Done() // signal one connection is in flight
+                       _, err := db.Exec("INSERT|people|name=Julia,age=19")
+                       errs <- err
+               }()
+       }
+
+       opening.Wait()                    // wait for all workers to begin running
+       time.Sleep(10 * time.Millisecond) // make extra sure all workers are blocked
+       close(unblock)                    // let all workers proceed
+
+       const timeout = 100 * time.Millisecond
+       to := time.NewTimer(timeout)
+       defer to.Stop()
+
+       // check that all connections fail without deadlock
+       for i := 0; i < tryOpen; i++ {
+               select {
+               case err := <-errs:
+                       if got, want := err, errOffline; got != want {
+                               t.Errorf("unexpected err: got %v, want %v", got, want)
+                       }
+               case <-to.C:
+                       t.Fatalf("orphaned connection request(s), still waiting after %v", timeout)
+               }
+       }
+}
+
 func TestSingleOpenConn(t *testing.T) {
        db := newTestDB(t, "people")
        defer closeDB(t, db)