Commit 6de40099 authored by Chris Hines's avatar Chris Hines Committed by Brad Fitzpatrick

database/sql: avoid deadlock waiting for connections

Previously with db.maxOpen > 0, db.maxOpen+n failed connection attempts
started concurrently could result in a deadlock. DB.conn and
DB.openNewConnection did not trigger the DB.connectionOpener go routine
after a failed connection attempt. This omission could leave go routines
waiting for DB.connectionOpener forever.

In addition the logic to track the state of the pool was inconsistent.
db.numOpen was sometimes incremented optimistically and sometimes not.
This change harmonizes the logic and eliminates the db.pendingOpens
variable, making the logic easier to understand and maintain.

Fixes #10886

Change-Id: I983c4921a3dacfbd531c3d7f8d2da8a592e9922a
Reviewed-on: https://go-review.googlesource.com/14547Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent 19aa4209
...@@ -153,12 +153,32 @@ func TestDrivers(t *testing.T) { ...@@ -153,12 +153,32 @@ func TestDrivers(t *testing.T) {
} }
} }
// hook to simulate connection failures
var hookOpenErr struct {
sync.Mutex
fn func() error
}
func setHookOpenErr(fn func() error) {
hookOpenErr.Lock()
defer hookOpenErr.Unlock()
hookOpenErr.fn = fn
}
// Supports dsn forms: // Supports dsn forms:
// <dbname> // <dbname>
// <dbname>;<opts> (only currently supported option is `badConn`, // <dbname>;<opts> (only currently supported option is `badConn`,
// which causes driver.ErrBadConn to be returned on // which causes driver.ErrBadConn to be returned on
// every other conn.Begin()) // every other conn.Begin())
func (d *fakeDriver) Open(dsn string) (driver.Conn, error) { func (d *fakeDriver) Open(dsn string) (driver.Conn, error) {
hookOpenErr.Lock()
fn := hookOpenErr.fn
hookOpenErr.Unlock()
if fn != nil {
if err := fn(); err != nil {
return nil, err
}
}
parts := strings.Split(dsn, ";") parts := strings.Split(dsn, ";")
if len(parts) < 1 { if len(parts) < 1 {
return nil, errors.New("fakedb: no database name") return nil, errors.New("fakedb: no database name")
......
...@@ -229,8 +229,7 @@ type DB struct { ...@@ -229,8 +229,7 @@ type DB struct {
mu sync.Mutex // protects following fields mu sync.Mutex // protects following fields
freeConn []*driverConn freeConn []*driverConn
connRequests []chan connRequest connRequests []chan connRequest
numOpen int numOpen int // number of opened and pending open connections
pendingOpens int
// Used to signal the need for new connections // Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and // a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection) // maybeOpenNewConnections sends on the chan (one send per needed connection)
...@@ -615,15 +614,15 @@ func (db *DB) Stats() DBStats { ...@@ -615,15 +614,15 @@ func (db *DB) Stats() DBStats {
// If there are connRequests and the connection limit hasn't been reached, // If there are connRequests and the connection limit hasn't been reached,
// then tell the connectionOpener to open new connections. // then tell the connectionOpener to open new connections.
func (db *DB) maybeOpenNewConnections() { func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests) - db.pendingOpens numRequests := len(db.connRequests)
if db.maxOpen > 0 { if db.maxOpen > 0 {
numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens) numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen { if numRequests > numCanOpen {
numRequests = numCanOpen numRequests = numCanOpen
} }
} }
for numRequests > 0 { for numRequests > 0 {
db.pendingOpens++ db.numOpen++ // optimistically
numRequests-- numRequests--
db.openerCh <- struct{}{} db.openerCh <- struct{}{}
} }
...@@ -638,6 +637,9 @@ func (db *DB) connectionOpener() { ...@@ -638,6 +637,9 @@ func (db *DB) connectionOpener() {
// Open one new connection // Open one new connection
func (db *DB) openNewConnection() { func (db *DB) openNewConnection() {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
// on db.openerCh. This function must execute db.numOpen-- if the
// connection fails or is closed before returning.
ci, err := db.driver.Open(db.dsn) ci, err := db.driver.Open(db.dsn)
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
...@@ -645,11 +647,13 @@ func (db *DB) openNewConnection() { ...@@ -645,11 +647,13 @@ func (db *DB) openNewConnection() {
if err == nil { if err == nil {
ci.Close() ci.Close()
} }
db.numOpen--
return return
} }
db.pendingOpens--
if err != nil { if err != nil {
db.numOpen--
db.putConnDBLocked(nil, err) db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections()
return return
} }
dc := &driverConn{ dc := &driverConn{
...@@ -658,8 +662,8 @@ func (db *DB) openNewConnection() { ...@@ -658,8 +662,8 @@ func (db *DB) openNewConnection() {
} }
if db.putConnDBLocked(dc, err) { if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc) db.addDepLocked(dc, dc)
db.numOpen++
} else { } else {
db.numOpen--
ci.Close() ci.Close()
} }
} }
...@@ -701,7 +705,10 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { ...@@ -701,7 +705,10 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
req := make(chan connRequest, 1) req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req) db.connRequests = append(db.connRequests, req)
db.mu.Unlock() db.mu.Unlock()
ret := <-req ret, ok := <-req
if !ok {
return nil, errDBClosed
}
return ret.conn, ret.err return ret.conn, ret.err
} }
...@@ -711,6 +718,7 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { ...@@ -711,6 +718,7 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
if err != nil { if err != nil {
db.mu.Lock() db.mu.Lock()
db.numOpen-- // correct for earlier optimism db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock() db.mu.Unlock()
return nil, err return nil, err
} }
......
...@@ -1159,6 +1159,67 @@ func TestMaxOpenConnsOnBusy(t *testing.T) { ...@@ -1159,6 +1159,67 @@ func TestMaxOpenConnsOnBusy(t *testing.T) {
} }
} }
// Issue 10886: tests that all connection attempts return when more than
// DB.maxOpen connections are in flight and the first DB.maxOpen fail.
func TestPendingConnsAfterErr(t *testing.T) {
const (
maxOpen = 2
tryOpen = maxOpen*2 + 2
)
db := newTestDB(t, "people")
defer closeDB(t, db)
defer func() {
for k, v := range db.lastPut {
t.Logf("%p: %v", k, v)
}
}()
db.SetMaxOpenConns(maxOpen)
db.SetMaxIdleConns(0)
errOffline := errors.New("db offline")
defer func() { setHookOpenErr(nil) }()
errs := make(chan error, tryOpen)
unblock := make(chan struct{})
setHookOpenErr(func() error {
<-unblock // block until all connections are in flight
return errOffline
})
var opening sync.WaitGroup
opening.Add(tryOpen)
for i := 0; i < tryOpen; i++ {
go func() {
opening.Done() // signal one connection is in flight
_, err := db.Exec("INSERT|people|name=Julia,age=19")
errs <- err
}()
}
opening.Wait() // wait for all workers to begin running
time.Sleep(10 * time.Millisecond) // make extra sure all workers are blocked
close(unblock) // let all workers proceed
const timeout = 100 * time.Millisecond
to := time.NewTimer(timeout)
defer to.Stop()
// check that all connections fail without deadlock
for i := 0; i < tryOpen; i++ {
select {
case err := <-errs:
if got, want := err, errOffline; got != want {
t.Errorf("unexpected err: got %v, want %v", got, want)
}
case <-to.C:
t.Fatalf("orphaned connection request(s), still waiting after %v", timeout)
}
}
}
func TestSingleOpenConn(t *testing.T) { func TestSingleOpenConn(t *testing.T) {
db := newTestDB(t, "people") db := newTestDB(t, "people")
defer closeDB(t, db) defer closeDB(t, db)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment