}
}
+// hook to simulate connection failures
+var hookOpenErr struct {
+ sync.Mutex
+ fn func() error
+}
+
+func setHookOpenErr(fn func() error) {
+ hookOpenErr.Lock()
+ defer hookOpenErr.Unlock()
+ hookOpenErr.fn = fn
+}
+
// Supports dsn forms:
// <dbname>
// <dbname>;<opts> (only currently supported option is `badConn`,
// which causes driver.ErrBadConn to be returned on
// every other conn.Begin())
func (d *fakeDriver) Open(dsn string) (driver.Conn, error) {
+ hookOpenErr.Lock()
+ fn := hookOpenErr.fn
+ hookOpenErr.Unlock()
+ if fn != nil {
+ if err := fn(); err != nil {
+ return nil, err
+ }
+ }
parts := strings.Split(dsn, ";")
if len(parts) < 1 {
return nil, errors.New("fakedb: no database name")
mu sync.Mutex // protects following fields
freeConn []*driverConn
connRequests []chan connRequest
- numOpen int
- pendingOpens int
+ numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// If there are connRequests and the connection limit hasn't been reached,
// then tell the connectionOpener to open new connections.
func (db *DB) maybeOpenNewConnections() {
- numRequests := len(db.connRequests) - db.pendingOpens
+ numRequests := len(db.connRequests)
if db.maxOpen > 0 {
- numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens)
+ numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
for numRequests > 0 {
- db.pendingOpens++
+ db.numOpen++ // optimistically
numRequests--
db.openerCh <- struct{}{}
}
// Open one new connection
func (db *DB) openNewConnection() {
+ // maybeOpenNewConnctions has already executed db.numOpen++ before it sent
+ // on db.openerCh. This function must execute db.numOpen-- if the
+ // connection fails or is closed before returning.
ci, err := db.driver.Open(db.dsn)
db.mu.Lock()
defer db.mu.Unlock()
if err == nil {
ci.Close()
}
+ db.numOpen--
return
}
- db.pendingOpens--
if err != nil {
+ db.numOpen--
db.putConnDBLocked(nil, err)
+ db.maybeOpenNewConnections()
return
}
dc := &driverConn{
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
- db.numOpen++
} else {
+ db.numOpen--
ci.Close()
}
}
req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req)
db.mu.Unlock()
- ret := <-req
+ ret, ok := <-req
+ if !ok {
+ return nil, errDBClosed
+ }
return ret.conn, ret.err
}
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
+ db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
}
}
+// Issue 10886: tests that all connection attempts return when more than
+// DB.maxOpen connections are in flight and the first DB.maxOpen fail.
+func TestPendingConnsAfterErr(t *testing.T) {
+ const (
+ maxOpen = 2
+ tryOpen = maxOpen*2 + 2
+ )
+
+ db := newTestDB(t, "people")
+ defer closeDB(t, db)
+ defer func() {
+ for k, v := range db.lastPut {
+ t.Logf("%p: %v", k, v)
+ }
+ }()
+
+ db.SetMaxOpenConns(maxOpen)
+ db.SetMaxIdleConns(0)
+
+ errOffline := errors.New("db offline")
+ defer func() { setHookOpenErr(nil) }()
+
+ errs := make(chan error, tryOpen)
+
+ unblock := make(chan struct{})
+ setHookOpenErr(func() error {
+ <-unblock // block until all connections are in flight
+ return errOffline
+ })
+
+ var opening sync.WaitGroup
+ opening.Add(tryOpen)
+ for i := 0; i < tryOpen; i++ {
+ go func() {
+ opening.Done() // signal one connection is in flight
+ _, err := db.Exec("INSERT|people|name=Julia,age=19")
+ errs <- err
+ }()
+ }
+
+ opening.Wait() // wait for all workers to begin running
+ time.Sleep(10 * time.Millisecond) // make extra sure all workers are blocked
+ close(unblock) // let all workers proceed
+
+ const timeout = 100 * time.Millisecond
+ to := time.NewTimer(timeout)
+ defer to.Stop()
+
+ // check that all connections fail without deadlock
+ for i := 0; i < tryOpen; i++ {
+ select {
+ case err := <-errs:
+ if got, want := err, errOffline; got != want {
+ t.Errorf("unexpected err: got %v, want %v", got, want)
+ }
+ case <-to.C:
+ t.Fatalf("orphaned connection request(s), still waiting after %v", timeout)
+ }
+ }
+}
+
func TestSingleOpenConn(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)