From: INADA Naoki Date: Fri, 23 Jan 2015 11:02:37 +0000 (+0900) Subject: database/sql: reduce lock contention in Stmt.connStmt X-Git-Tag: go1.5beta1~2274 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=1b61a97811626d4c7a8332c107f1e091253d1b2e;p=gostls13.git database/sql: reduce lock contention in Stmt.connStmt Previouslly, Stmt.connStmt calls DB.connIfFree on each Stmt.css. Since Stmt.connStmt locks Stmt.mu, a concurrent use of Stmt causes lock contention on Stmt.mu. Additionally, DB.connIfFree locks DB.mu which is shared by DB.addDep and DB.removeDep. This change removes DB.connIfFree and makes use of a first unused connection in idle connection pool to reduce lock contention without making it complicated. Fixes #9484 On EC2 c3.8xlarge (E5-2680 v2 @ 2.80GHz * 32 vCPU): benchmark old ns/op new ns/op delta BenchmarkManyConcurrentQuery-8 40249 34721 -13.73% BenchmarkManyConcurrentQuery-16 45610 40176 -11.91% BenchmarkManyConcurrentQuery-32 109831 43179 -60.69% benchmark old allocs new allocs delta BenchmarkManyConcurrentQuery-8 25 25 +0.00% BenchmarkManyConcurrentQuery-16 25 25 +0.00% BenchmarkManyConcurrentQuery-32 25 25 +0.00% benchmark old bytes new bytes delta BenchmarkManyConcurrentQuery-8 3980 3969 -0.28% BenchmarkManyConcurrentQuery-16 3980 3982 +0.05% BenchmarkManyConcurrentQuery-32 3993 3990 -0.08% Change-Id: Ic96296922c465bac38a260018c58324dae1531d9 Reviewed-on: https://go-review.googlesource.com/2207 Reviewed-by: Mikio Hara --- diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go index 8db9c78571..1ce679d8a6 100644 --- a/src/database/sql/sql.go +++ b/src/database/sql/sql.go @@ -20,6 +20,7 @@ import ( "runtime" "sort" "sync" + "sync/atomic" ) var drivers = make(map[string]driver.Driver) @@ -211,6 +212,10 @@ var ErrNoRows = errors.New("sql: no rows in result set") type DB struct { driver driver.Driver dsn string + // numClosed is an atomic counter which represents a total number of + // closed connections. Stmt.openStmt checks it before cleaning closed + // connections in Stmt.css. + numClosed uint64 mu sync.Mutex // protects following fields freeConn []*driverConn @@ -246,7 +251,7 @@ type driverConn struct { // guarded by db.mu inUse bool onPut []func() // code (with db.mu held) run when conn is next returned - dbmuClosed bool // same as closed, but guarded by db.mu, for connIfFree + dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked } func (dc *driverConn) releaseConn(err error) { @@ -329,6 +334,7 @@ func (dc *driverConn) finalClose() error { dc.db.maybeOpenNewConnections() dc.db.mu.Unlock() + atomic.AddUint64(&dc.db.numClosed, 1) return err } @@ -683,42 +689,6 @@ var ( errConnBusy = errors.New("database/sql: internal sentinel error: conn is busy") ) -// connIfFree returns (wanted, nil) if wanted is still a valid conn and -// isn't in use. -// -// The error is errConnClosed if the connection if the requested connection -// is invalid because it's been closed. -// -// The error is errConnBusy if the connection is in use. -func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) { - db.mu.Lock() - defer db.mu.Unlock() - if wanted.dbmuClosed { - return nil, errConnClosed - } - if wanted.inUse { - return nil, errConnBusy - } - idx := -1 - for ii, v := range db.freeConn { - if v == wanted { - idx = ii - break - } - } - if idx >= 0 { - db.freeConn = append(db.freeConn[:idx], db.freeConn[idx+1:]...) - wanted.inUse = true - return wanted, nil - } - // TODO(bradfitz): shouldn't get here. After Go 1.1, change this to: - // panic("connIfFree call requested a non-closed, non-busy, non-free conn") - // Which passes all the tests, but I'm too paranoid to include this - // late in Go 1.1. - // Instead, treat it like a busy connection: - return nil, errConnBusy -} - // putConnHook is a hook for testing. var putConnHook func(*DB, *driverConn) @@ -856,9 +826,10 @@ func (db *DB) prepare(query string) (*Stmt, error) { return nil, err } stmt := &Stmt{ - db: db, - query: query, - css: []connStmt{{dc, si}}, + db: db, + query: query, + css: []connStmt{{dc, si}}, + lastNumClosed: atomic.LoadUint64(&db.numClosed), } db.addDep(stmt, stmt) db.putConn(dc, nil) @@ -1293,6 +1264,10 @@ type Stmt struct { // used if tx == nil and one is found that has idle // connections. If tx != nil, txsi is always used. css []connStmt + + // lastNumClosed is copied from db.numClosed when Stmt is created + // without tx and closed connections in css are removed. + lastNumClosed uint64 } // Exec executes a prepared statement with the given arguments and @@ -1346,6 +1321,32 @@ func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) { return driverResult{ds.Locker, resi}, nil } +// removeClosedStmtLocked removes closed conns in s.css. +// +// To avoid lock contention on DB.mu, we do it only when +// s.db.numClosed - s.lastNum is large enough. +func (s *Stmt) removeClosedStmtLocked() { + t := len(s.css)/2 + 1 + if t > 10 { + t = 10 + } + dbClosed := atomic.LoadUint64(&s.db.numClosed) + if dbClosed-s.lastNumClosed < uint64(t) { + return + } + + s.db.mu.Lock() + for i := 0; i < len(s.css); i++ { + if s.css[i].dc.dbmuClosed { + s.css[i] = s.css[len(s.css)-1] + s.css = s.css[:len(s.css)-1] + i-- + } + } + s.db.mu.Unlock() + s.lastNumClosed = dbClosed +} + // connStmt returns a free driver connection on which to execute the // statement, a function to call to release the connection, and a // statement bound to that connection. @@ -1372,35 +1373,15 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St return ci, releaseConn, s.txsi.si, nil } - for i := 0; i < len(s.css); i++ { - v := s.css[i] - _, err := s.db.connIfFree(v.dc) - if err == nil { - s.mu.Unlock() - return v.dc, v.dc.releaseConn, v.si, nil - } - if err == errConnClosed { - // Lazily remove dead conn from our freelist. - s.css[i] = s.css[len(s.css)-1] - s.css = s.css[:len(s.css)-1] - i-- - } - - } + s.removeClosedStmtLocked() s.mu.Unlock() - // If all connections are busy, either wait for one to become available (if - // we've already hit the maximum number of open connections) or create a - // new one. - // // TODO(bradfitz): or always wait for one? make configurable later? dc, err := s.db.conn() if err != nil { return nil, nil, nil, err } - // Do another pass over the list to see whether this statement has - // already been prepared on the connection assigned to us. s.mu.Lock() for _, v := range s.css { if v.dc == dc { diff --git a/src/database/sql/sql_test.go b/src/database/sql/sql_test.go index 34efdf254c..60bdefa076 100644 --- a/src/database/sql/sql_test.go +++ b/src/database/sql/sql_test.go @@ -1764,56 +1764,6 @@ func doConcurrentTest(t testing.TB, ct concurrentTest) { wg.Wait() } -func manyConcurrentQueries(t testing.TB) { - maxProcs, numReqs := 16, 500 - if testing.Short() { - maxProcs, numReqs = 4, 50 - } - defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs)) - - db := newTestDB(t, "people") - defer closeDB(t, db) - - stmt, err := db.Prepare("SELECT|people|name|") - if err != nil { - t.Fatal(err) - } - defer stmt.Close() - - var wg sync.WaitGroup - wg.Add(numReqs) - - reqs := make(chan bool) - defer close(reqs) - - for i := 0; i < maxProcs*2; i++ { - go func() { - for range reqs { - rows, err := stmt.Query() - if err != nil { - t.Errorf("error on query: %v", err) - wg.Done() - continue - } - - var name string - for rows.Next() { - rows.Scan(&name) - } - rows.Close() - - wg.Done() - } - }() - } - - for i := 0; i < numReqs; i++ { - reqs <- true - } - - wg.Wait() -} - func TestIssue6081(t *testing.T) { db := newTestDB(t, "people") defer closeDB(t, db) @@ -1985,3 +1935,31 @@ func BenchmarkConcurrentRandom(b *testing.B) { doConcurrentTest(b, ct) } } + +func BenchmarkManyConcurrentQueries(b *testing.B) { + b.ReportAllocs() + // To see lock contention in Go 1.4, 16~ cores and 128~ goroutines are required. + const parallelism = 16 + + db := newTestDB(b, "magicquery") + defer closeDB(b, db) + db.SetMaxIdleConns(runtime.GOMAXPROCS(0) * parallelism) + + stmt, err := db.Prepare("SELECT|magicquery|op|op=?,millis=?") + if err != nil { + b.Fatal(err) + } + defer stmt.Close() + + b.SetParallelism(parallelism) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + rows, err := stmt.Query("sleep", 1) + if err != nil { + b.Error(err) + return + } + rows.Close() + } + }) +}