Skip to content

client: add close method for client pool #797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 22, 2023
1 change: 1 addition & 0 deletions client/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func Test(t *testing.T) {
for _, seg := range segs {
Suite(&clientTestSuite{port: seg})
Suite(&connTestSuite{port: seg})
Suite(&poolTestSuite{port: seg})
}
TestingT(t)
}
34 changes: 34 additions & 0 deletions client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -42,6 +43,7 @@ type (
}

readyConnection chan Connection
closed uint32
}

ConnectionStats struct {
Expand Down Expand Up @@ -143,6 +145,9 @@ func (pool *Pool) GetStats(stats *ConnectionStats) {
// GetConn returns connection from the pool or create new
func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) {
for {
if atomic.LoadUint32(&pool.closed) == 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about your usage. Will you call Close concurrent with GetConn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In normal condition, we won't call Close concurrent with GetConn. I add this line is in order to tell caller that pool was closed when someone calls GetConn after Close.

return nil, errors.Errorf("failed get conn, pool closed")
}
connection, err := pool.getConnection(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -228,6 +233,9 @@ func (pool *Pool) newConnectionProducer() {
var err error

for {
if atomic.LoadUint32(&pool.closed) == 1 {
return
}
connection.conn = nil

pool.synchro.Lock()
Expand Down Expand Up @@ -298,6 +306,9 @@ func (pool *Pool) closeOldIdleConnections() {
ticker := time.NewTicker(5 * time.Second)

for range ticker.C {
if atomic.LoadUint32(&pool.closed) == 1 {
return
}
toPing = pool.getOldIdleConnections(toPing[:0])
if len(toPing) == 0 {
continue
Expand Down Expand Up @@ -475,3 +486,26 @@ func (pool *Pool) ping(conn *Conn) error {
}
return err
}

// Close only shutdown idle connections. we couldn't control the connection which not in the pool.
// So before call Close, Call PutConn to put all connections that in use back to connection pool first.
func (pool *Pool) Close() {
//already closed, return.
if !atomic.CompareAndSwapUint32(&pool.closed, 0, 1) {
return
}
//close idle connections
pool.synchro.Lock()
for _, connection := range pool.synchro.idleConnections {
pool.synchro.stats.TotalCount--
_ = connection.conn.Close()
}
pool.synchro.idleConnections = nil
pool.synchro.Unlock()
//close connection in ready
select {
case connection := <-pool.readyConnection:
pool.closeConn(connection.conn)
default:
}
}
31 changes: 31 additions & 0 deletions client/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package client

import (
"context"
"fmt"

"github.com/go-mysql-org/go-mysql/test_util"
. "github.com/pingcap/check"
"github.com/siddontang/go-log/log"
)

type poolTestSuite struct {
port string
}

func (s poolTestSuite) TestPool_Close(c *C) {
addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port)
pool := NewPool(log.Debugf, 5, 10, 5, addr, *testUser, *testPassword, "")
conn, err := pool.GetConn(context.Background())
c.Assert(err, IsNil)
err = conn.Ping()
c.Assert(err, IsNil)
pool.PutConn(conn)
pool.Close()
var poolStats ConnectionStats
pool.GetStats(&poolStats)
c.Assert(poolStats.TotalCount, Equals, 0)
c.Assert(pool.readyConnection, HasLen, 0)
_, err = pool.GetConn(context.Background())
c.Assert(err, NotNil)
}