]> Cypherpunks repositories - gostls13.git/commitdiff
database/sql: reduce lock contention in Stmt.connStmt
authorINADA Naoki <songofacandy@gmail.com>
Fri, 23 Jan 2015 11:02:37 +0000 (20:02 +0900)
committerMikio Hara <mikioh.mikioh@gmail.com>
Sat, 24 Jan 2015 09:56:25 +0000 (09:56 +0000)
Previouslly, Stmt.connStmt calls DB.connIfFree on each Stmt.css.
Since Stmt.connStmt locks Stmt.mu, a concurrent use of Stmt causes lock
contention on Stmt.mu.
Additionally, DB.connIfFree locks DB.mu which is shared by DB.addDep and
DB.removeDep.

This change removes DB.connIfFree and makes use of a first unused
connection in idle connection pool to reduce lock contention
without making it complicated.

Fixes #9484

On EC2 c3.8xlarge (E5-2680 v2 @ 2.80GHz * 32 vCPU):

benchmark                           old ns/op     new ns/op     delta
BenchmarkManyConcurrentQuery-8      40249         34721         -13.73%
BenchmarkManyConcurrentQuery-16     45610         40176         -11.91%
BenchmarkManyConcurrentQuery-32     109831        43179         -60.69%

benchmark                           old allocs     new allocs     delta
BenchmarkManyConcurrentQuery-8      25             25             +0.00%
BenchmarkManyConcurrentQuery-16     25             25             +0.00%
BenchmarkManyConcurrentQuery-32     25             25             +0.00%

benchmark                           old bytes     new bytes     delta
BenchmarkManyConcurrentQuery-8      3980          3969          -0.28%
BenchmarkManyConcurrentQuery-16     3980          3982          +0.05%
BenchmarkManyConcurrentQuery-32     3993          3990          -0.08%

Change-Id: Ic96296922c465bac38a260018c58324dae1531d9
Reviewed-on: https://go-review.googlesource.com/2207
Reviewed-by: Mikio Hara <mikioh.mikioh@gmail.com>
src/database/sql/sql.go
src/database/sql/sql_test.go

index 8db9c785713a33eb8bc84ec02e5dfcd223a6f615..1ce679d8a6edea29acfb268653799c2b03b89229 100644 (file)
@@ -20,6 +20,7 @@ import (
        "runtime"
        "sort"
        "sync"
+       "sync/atomic"
 )
 
 var drivers = make(map[string]driver.Driver)
@@ -211,6 +212,10 @@ var ErrNoRows = errors.New("sql: no rows in result set")
 type DB struct {
        driver driver.Driver
        dsn    string
+       // numClosed is an atomic counter which represents a total number of
+       // closed connections. Stmt.openStmt checks it before cleaning closed
+       // connections in Stmt.css.
+       numClosed uint64
 
        mu           sync.Mutex // protects following fields
        freeConn     []*driverConn
@@ -246,7 +251,7 @@ type driverConn struct {
        // guarded by db.mu
        inUse      bool
        onPut      []func() // code (with db.mu held) run when conn is next returned
-       dbmuClosed bool     // same as closed, but guarded by db.mu, for connIfFree
+       dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
 }
 
 func (dc *driverConn) releaseConn(err error) {
@@ -329,6 +334,7 @@ func (dc *driverConn) finalClose() error {
        dc.db.maybeOpenNewConnections()
        dc.db.mu.Unlock()
 
+       atomic.AddUint64(&dc.db.numClosed, 1)
        return err
 }
 
@@ -683,42 +689,6 @@ var (
        errConnBusy   = errors.New("database/sql: internal sentinel error: conn is busy")
 )
 
-// connIfFree returns (wanted, nil) if wanted is still a valid conn and
-// isn't in use.
-//
-// The error is errConnClosed if the connection if the requested connection
-// is invalid because it's been closed.
-//
-// The error is errConnBusy if the connection is in use.
-func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) {
-       db.mu.Lock()
-       defer db.mu.Unlock()
-       if wanted.dbmuClosed {
-               return nil, errConnClosed
-       }
-       if wanted.inUse {
-               return nil, errConnBusy
-       }
-       idx := -1
-       for ii, v := range db.freeConn {
-               if v == wanted {
-                       idx = ii
-                       break
-               }
-       }
-       if idx >= 0 {
-               db.freeConn = append(db.freeConn[:idx], db.freeConn[idx+1:]...)
-               wanted.inUse = true
-               return wanted, nil
-       }
-       // TODO(bradfitz): shouldn't get here. After Go 1.1, change this to:
-       // panic("connIfFree call requested a non-closed, non-busy, non-free conn")
-       // Which passes all the tests, but I'm too paranoid to include this
-       // late in Go 1.1.
-       // Instead, treat it like a busy connection:
-       return nil, errConnBusy
-}
-
 // putConnHook is a hook for testing.
 var putConnHook func(*DB, *driverConn)
 
@@ -856,9 +826,10 @@ func (db *DB) prepare(query string) (*Stmt, error) {
                return nil, err
        }
        stmt := &Stmt{
-               db:    db,
-               query: query,
-               css:   []connStmt{{dc, si}},
+               db:            db,
+               query:         query,
+               css:           []connStmt{{dc, si}},
+               lastNumClosed: atomic.LoadUint64(&db.numClosed),
        }
        db.addDep(stmt, stmt)
        db.putConn(dc, nil)
@@ -1293,6 +1264,10 @@ type Stmt struct {
        // used if tx == nil and one is found that has idle
        // connections.  If tx != nil, txsi is always used.
        css []connStmt
+
+       // lastNumClosed is copied from db.numClosed when Stmt is created
+       // without tx and closed connections in css are removed.
+       lastNumClosed uint64
 }
 
 // Exec executes a prepared statement with the given arguments and
@@ -1346,6 +1321,32 @@ func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) {
        return driverResult{ds.Locker, resi}, nil
 }
 
+// removeClosedStmtLocked removes closed conns in s.css.
+//
+// To avoid lock contention on DB.mu, we do it only when
+// s.db.numClosed - s.lastNum is large enough.
+func (s *Stmt) removeClosedStmtLocked() {
+       t := len(s.css)/2 + 1
+       if t > 10 {
+               t = 10
+       }
+       dbClosed := atomic.LoadUint64(&s.db.numClosed)
+       if dbClosed-s.lastNumClosed < uint64(t) {
+               return
+       }
+
+       s.db.mu.Lock()
+       for i := 0; i < len(s.css); i++ {
+               if s.css[i].dc.dbmuClosed {
+                       s.css[i] = s.css[len(s.css)-1]
+                       s.css = s.css[:len(s.css)-1]
+                       i--
+               }
+       }
+       s.db.mu.Unlock()
+       s.lastNumClosed = dbClosed
+}
+
 // connStmt returns a free driver connection on which to execute the
 // statement, a function to call to release the connection, and a
 // statement bound to that connection.
@@ -1372,35 +1373,15 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St
                return ci, releaseConn, s.txsi.si, nil
        }
 
-       for i := 0; i < len(s.css); i++ {
-               v := s.css[i]
-               _, err := s.db.connIfFree(v.dc)
-               if err == nil {
-                       s.mu.Unlock()
-                       return v.dc, v.dc.releaseConn, v.si, nil
-               }
-               if err == errConnClosed {
-                       // Lazily remove dead conn from our freelist.
-                       s.css[i] = s.css[len(s.css)-1]
-                       s.css = s.css[:len(s.css)-1]
-                       i--
-               }
-
-       }
+       s.removeClosedStmtLocked()
        s.mu.Unlock()
 
-       // If all connections are busy, either wait for one to become available (if
-       // we've already hit the maximum number of open connections) or create a
-       // new one.
-       //
        // TODO(bradfitz): or always wait for one? make configurable later?
        dc, err := s.db.conn()
        if err != nil {
                return nil, nil, nil, err
        }
 
-       // Do another pass over the list to see whether this statement has
-       // already been prepared on the connection assigned to us.
        s.mu.Lock()
        for _, v := range s.css {
                if v.dc == dc {
index 34efdf254c65309f9ea0c48a6921ff2a9add9f97..60bdefa07625f5c4431cbe096c0768e0c3f29a89 100644 (file)
@@ -1764,56 +1764,6 @@ func doConcurrentTest(t testing.TB, ct concurrentTest) {
        wg.Wait()
 }
 
-func manyConcurrentQueries(t testing.TB) {
-       maxProcs, numReqs := 16, 500
-       if testing.Short() {
-               maxProcs, numReqs = 4, 50
-       }
-       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs))
-
-       db := newTestDB(t, "people")
-       defer closeDB(t, db)
-
-       stmt, err := db.Prepare("SELECT|people|name|")
-       if err != nil {
-               t.Fatal(err)
-       }
-       defer stmt.Close()
-
-       var wg sync.WaitGroup
-       wg.Add(numReqs)
-
-       reqs := make(chan bool)
-       defer close(reqs)
-
-       for i := 0; i < maxProcs*2; i++ {
-               go func() {
-                       for range reqs {
-                               rows, err := stmt.Query()
-                               if err != nil {
-                                       t.Errorf("error on query:  %v", err)
-                                       wg.Done()
-                                       continue
-                               }
-
-                               var name string
-                               for rows.Next() {
-                                       rows.Scan(&name)
-                               }
-                               rows.Close()
-
-                               wg.Done()
-                       }
-               }()
-       }
-
-       for i := 0; i < numReqs; i++ {
-               reqs <- true
-       }
-
-       wg.Wait()
-}
-
 func TestIssue6081(t *testing.T) {
        db := newTestDB(t, "people")
        defer closeDB(t, db)
@@ -1985,3 +1935,31 @@ func BenchmarkConcurrentRandom(b *testing.B) {
                doConcurrentTest(b, ct)
        }
 }
+
+func BenchmarkManyConcurrentQueries(b *testing.B) {
+       b.ReportAllocs()
+       // To see lock contention in Go 1.4, 16~ cores and 128~ goroutines are required.
+       const parallelism = 16
+
+       db := newTestDB(b, "magicquery")
+       defer closeDB(b, db)
+       db.SetMaxIdleConns(runtime.GOMAXPROCS(0) * parallelism)
+
+       stmt, err := db.Prepare("SELECT|magicquery|op|op=?,millis=?")
+       if err != nil {
+               b.Fatal(err)
+       }
+       defer stmt.Close()
+
+       b.SetParallelism(parallelism)
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       rows, err := stmt.Query("sleep", 1)
+                       if err != nil {
+                               b.Error(err)
+                               return
+                       }
+                       rows.Close()
+               }
+       })
+}