return nil, err
}
- return db.queryDC(ctx, dc, dc.releaseConn, query, args)
+ return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}
// queryDC executes a query on the given connection.
// The connection gets released by the releaseConn function.
-func (db *DB) queryDC(ctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
+// The ctx context is from a query method and the txctx context is from an
+// optional transaction context.
+func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
if queryer, ok := dc.ci.(driver.Queryer); ok {
dargs, err := driverArgs(dc.ci, nil, args)
if err != nil {
releaseConn: releaseConn,
rowsi: rowsi,
}
- rows.initContextClose(ctx)
+ rows.initContextClose(ctx, txctx)
return rows, nil
}
}
rowsi: rowsi,
closeStmt: ds,
}
- rows.initContextClose(ctx)
+ rows.initContextClose(ctx, txctx)
return rows, nil
}
}
c.closemu.RLock()
- return c.db.queryDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args)
+ return c.db.queryDC(ctx, nil, dc, c.closemuRUnlockCondReleaseConn, query, args)
}
// QueryRowContext executes a query that is expected to return at most one row.
// close returns the connection to the pool and
// must only be called by Tx.rollback or Tx.Commit.
func (tx *Tx) close(err error) {
+ tx.cancel()
+
tx.closemu.Lock()
defer tx.closemu.Unlock()
tx.releaseConn(err)
- tx.cancel()
tx.dc = nil
tx.txi = nil
}
return nil, err
}
- return tx.db.queryDC(ctx, dc, tx.closemuRUnlockRelease, query, args)
+ return tx.db.queryDC(ctx, tx.ctx, dc, tx.closemuRUnlockRelease, query, args)
}
// Query executes a query that returns rows, typically a SELECT.
releaseConn(err)
s.db.removeDep(s, rows)
}
- rows.initContextClose(ctx)
+ var txctx context.Context
+ if s.tx != nil {
+ txctx = s.tx.ctx
+ }
+ rows.initContextClose(ctx, txctx)
return rows, nil
}
lastcols []driver.Value
}
-func (rs *Rows) initContextClose(ctx context.Context) {
+func (rs *Rows) initContextClose(ctx, txctx context.Context) {
ctx, rs.cancel = context.WithCancel(ctx)
- go rs.awaitDone(ctx)
+ go rs.awaitDone(ctx, txctx)
}
-// awaitDone blocks until the rows are closed or the context canceled.
-func (rs *Rows) awaitDone(ctx context.Context) {
- <-ctx.Done()
+// awaitDone blocks until either ctx or txctx is canceled. The ctx is provided
+// from the query context and is canceled when the query Rows is closed.
+// If the query was issued in a transaction, the transaction's context
+// is also provided in txctx to ensure Rows is closed if the Tx is closed.
+func (rs *Rows) awaitDone(ctx, txctx context.Context) {
+ var txctxDone <-chan struct{}
+ if txctx != nil {
+ txctxDone = txctx.Done()
+ }
+ select {
+ case <-ctx.Done():
+ case <-txctxDone:
+ }
rs.close(ctx.Err())
}
}
}
+// TestIssue20575 ensures the Rows from query does not block
+// closing a transaction. Ensure Rows is closed while closing a trasaction.
+func TestIssue20575(t *testing.T) {
+ db := newTestDB(t, "people")
+ tx, err := db.Begin()
+ if err != nil {
+ t.Fatal(err)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ _, err = tx.QueryContext(ctx, "SELECT|people|age,name|")
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Do not close Rows from QueryContext.
+ err = tx.Rollback()
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ default:
+ case <-ctx.Done():
+ t.Fatal("timeout: failed to rollback query without closing rows:", ctx.Err())
+ }
+}
+
// golang.org/issue/5718
func TestErrBadConnReconnect(t *testing.T) {
db := newTestDB(t, "foo")