Skip to content

client: add close method for client pool #797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 22, 2023
1 change: 1 addition & 0 deletions client/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func Test(t *testing.T) {
for _, seg := range segs {
Suite(&clientTestSuite{port: seg})
Suite(&connTestSuite{port: seg})
Suite(&poolTestSuite{port: seg})
}
TestingT(t)
}
57 changes: 47 additions & 10 deletions client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type (
}

readyConnection chan Connection
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

ConnectionStats struct {
Expand Down Expand Up @@ -116,15 +119,19 @@ func NewPool(
readyConnection: make(chan Connection),
}

pool.ctx, pool.cancel = context.WithCancel(context.Background())

pool.synchro.idleConnections = make([]Connection, 0, pool.maxIdle)

pool.wg.Add(1)
go pool.newConnectionProducer()

if pool.minAlive > 0 {
pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive)
pool.startNewConnections(pool.minAlive)
}

pool.wg.Add(1)
go pool.closeOldIdleConnections()

return pool
Expand All @@ -143,6 +150,9 @@ func (pool *Pool) GetStats(stats *ConnectionStats) {
// GetConn returns connection from the pool or create new
func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) {
for {
if pool.ctx.Err() != nil {
return nil, errors.Errorf("failed get conn, pool closed")
}
connection, err := pool.getConnection(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -224,6 +234,7 @@ func (pool *Pool) putConnectionUnsafe(connection Connection) {
}

func (pool *Pool) newConnectionProducer() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found another problem, please help me check it.
in line 271, pool.readyConnection <- connection may block the for loop. So if Close tries to drain pool.readyConnection first and then newConnectionProducer sends at channel, it will become deadlock.

Generally, I think a <-pool.ctx.Done() branch with pool.readyConnection <- connection will help to avoid blocking. And pool.ctx.Err() != nil can replace pool.closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing pool.closed with ctx.Err is a good idea, I wiil update with it. Thanks.

defer pool.wg.Done()
var connection Connection
var err error

Expand Down Expand Up @@ -256,8 +267,12 @@ func (pool *Pool) newConnectionProducer() {
continue
}
}

pool.readyConnection <- connection
select {
case pool.readyConnection <- connection:
case <-pool.ctx.Done():
pool.closeConn(connection.conn)
return
}
}
}

Expand Down Expand Up @@ -293,19 +308,25 @@ func (pool *Pool) getIdleConnectionUnsafe() Connection {
}

func (pool *Pool) closeOldIdleConnections() {
defer pool.wg.Done()
var toPing []Connection

ticker := time.NewTicker(5 * time.Second)

for range ticker.C {
toPing = pool.getOldIdleConnections(toPing[:0])
if len(toPing) == 0 {
continue
}
pool.recheckConnections(toPing)
for {
select {
case <-pool.ctx.Done():
return
case <-ticker.C:
toPing = pool.getOldIdleConnections(toPing[:0])
if len(toPing) == 0 {
continue
}
pool.recheckConnections(toPing)

if !pool.spawnConnectionsIfNeeded() {
pool.closeIdleConnectionsIfCan()
if !pool.spawnConnectionsIfNeeded() {
pool.closeIdleConnectionsIfCan()
}
}
}
}
Expand Down Expand Up @@ -475,3 +496,19 @@ func (pool *Pool) ping(conn *Conn) error {
}
return err
}

// Close only shutdown idle connections. we couldn't control the connection which not in the pool.
// So before call Close, Call PutConn to put all connections that in use back to connection pool first.
func (pool *Pool) Close() {
pool.cancel()
//wait newConnectionProducer exit.
pool.wg.Wait()
//close idle connections
pool.synchro.Lock()
for _, connection := range pool.synchro.idleConnections {
pool.synchro.stats.TotalCount--
_ = connection.conn.Close()
}
pool.synchro.idleConnections = nil
pool.synchro.Unlock()
}
31 changes: 31 additions & 0 deletions client/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package client

import (
"context"
"fmt"

"github.com/go-mysql-org/go-mysql/test_util"
. "github.com/pingcap/check"
"github.com/siddontang/go-log/log"
)

type poolTestSuite struct {
port string
}

func (s poolTestSuite) TestPool_Close(c *C) {
addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port)
pool := NewPool(log.Debugf, 5, 10, 5, addr, *testUser, *testPassword, "")
conn, err := pool.GetConn(context.Background())
c.Assert(err, IsNil)
err = conn.Ping()
c.Assert(err, IsNil)
pool.PutConn(conn)
pool.Close()
var poolStats ConnectionStats
pool.GetStats(&poolStats)
c.Assert(poolStats.TotalCount, Equals, 0)
c.Assert(pool.readyConnection, HasLen, 0)
_, err = pool.GetConn(context.Background())
c.Assert(err, NotNil)
}