From 1a8e6a952d5d65a898c34b5d688bb6b75fe368c5 Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Thu, 15 Jun 2023 20:36:33 +0800 Subject: [PATCH 1/7] client: add close method for client pool --- client/common_test.go | 1 + client/pool.go | 31 +++++++++++++++++++++++++++++++ client/pool_test.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+) create mode 100644 client/pool_test.go diff --git a/client/common_test.go b/client/common_test.go index ec516a73d..d28e63753 100644 --- a/client/common_test.go +++ b/client/common_test.go @@ -25,6 +25,7 @@ func Test(t *testing.T) { for _, seg := range segs { Suite(&clientTestSuite{port: seg}) Suite(&connTestSuite{port: seg}) + Suite(&poolTestSuite{port: seg}) } TestingT(t) } diff --git a/client/pool.go b/client/pool.go index 263d52773..569d8a838 100644 --- a/client/pool.go +++ b/client/pool.go @@ -5,6 +5,7 @@ import ( "math" "math/rand" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -42,6 +43,7 @@ type ( } readyConnection chan Connection + closed uint32 } ConnectionStats struct { @@ -143,6 +145,9 @@ func (pool *Pool) GetStats(stats *ConnectionStats) { // GetConn returns connection from the pool or create new func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) { for { + if atomic.LoadUint32(&pool.closed) == 1 { + return nil, errors.Errorf("failed get conn, pool closed") + } connection, err := pool.getConnection(ctx) if err != nil { return nil, err @@ -228,6 +233,9 @@ func (pool *Pool) newConnectionProducer() { var err error for { + if atomic.LoadUint32(&pool.closed) == 1 { + return + } connection.conn = nil pool.synchro.Lock() @@ -475,3 +483,26 @@ func (pool *Pool) ping(conn *Conn) error { } return err } + +// Close only shutdown idle connections. we couldn't control the connection which not in the pool. +// So before call Close, Call PutConn to put all connections that in use back to connection pool first. +func (pool *Pool) Close() { + //already closed, return. + if !atomic.CompareAndSwapUint32(&pool.closed, 0, 1) { + return + } + //close idle connections + pool.synchro.Lock() + for _, connection := range pool.synchro.idleConnections { + pool.synchro.stats.TotalCount-- + _ = connection.conn.Close() + } + pool.synchro.idleConnections = nil + pool.synchro.Unlock() + //close connection in ready + select { + case connection := <-pool.readyConnection: + pool.closeConn(connection.conn) + default: + } +} diff --git a/client/pool_test.go b/client/pool_test.go new file mode 100644 index 000000000..45942c228 --- /dev/null +++ b/client/pool_test.go @@ -0,0 +1,32 @@ +package client + +import ( + "context" + "fmt" + + "github.com/go-mysql-org/go-mysql/test_util" + . "github.com/pingcap/check" + "github.com/siddontang/go-log/log" +) + +type poolTestSuite struct { + c *Conn + port string +} + +func (s poolTestSuite) TestPool_Close(c *C) { + addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port) + pool := NewPool(log.Debugf, 5, 10, 5, addr, *testUser, *testPassword, "") + conn, err := pool.GetConn(context.Background()) + c.Assert(err, IsNil) + err = conn.Ping() + c.Assert(err, IsNil) + pool.PutConn(conn) + pool.Close() + var poolStats ConnectionStats + pool.GetStats(&poolStats) + c.Assert(poolStats.TotalCount, Equals, 0) + c.Assert(pool.readyConnection, HasLen, 0) + conn, err = pool.GetConn(context.Background()) + c.Assert(err, NotNil) +} From de658e4aa8aac0f03636327a23b126cd5f368043 Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Fri, 16 Jun 2023 10:50:21 +0800 Subject: [PATCH 2/7] client: remove unused var --- client/pool_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/pool_test.go b/client/pool_test.go index 45942c228..8d6e0c05a 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -10,7 +10,6 @@ import ( ) type poolTestSuite struct { - c *Conn port string } @@ -27,6 +26,6 @@ func (s poolTestSuite) TestPool_Close(c *C) { pool.GetStats(&poolStats) c.Assert(poolStats.TotalCount, Equals, 0) c.Assert(pool.readyConnection, HasLen, 0) - conn, err = pool.GetConn(context.Background()) + _, err = pool.GetConn(context.Background()) c.Assert(err, NotNil) } From 6853f7693ed028b8294a5c06fc606e87cd5f4b2b Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Fri, 16 Jun 2023 10:52:12 +0800 Subject: [PATCH 3/7] client: close background goroutine when pool closed --- client/pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/pool.go b/client/pool.go index 569d8a838..a13798619 100644 --- a/client/pool.go +++ b/client/pool.go @@ -306,6 +306,9 @@ func (pool *Pool) closeOldIdleConnections() { ticker := time.NewTicker(5 * time.Second) for range ticker.C { + if atomic.LoadUint32(&pool.closed) == 1 { + return + } toPing = pool.getOldIdleConnections(toPing[:0]) if len(toPing) == 0 { continue From a629c76f2911123144b24da0b9681f592570a266 Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Mon, 19 Jun 2023 14:15:21 +0800 Subject: [PATCH 4/7] Update pool.go client: wait connection producer exit in pool.Close() --- client/pool.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/client/pool.go b/client/pool.go index a13798619..b67ec5470 100644 --- a/client/pool.go +++ b/client/pool.go @@ -44,6 +44,7 @@ type ( readyConnection chan Connection closed uint32 + wg sync.WaitGroup } ConnectionStats struct { @@ -120,6 +121,7 @@ func NewPool( pool.synchro.idleConnections = make([]Connection, 0, pool.maxIdle) + pool.wg.Add(1) go pool.newConnectionProducer() if pool.minAlive > 0 { @@ -229,6 +231,7 @@ func (pool *Pool) putConnectionUnsafe(connection Connection) { } func (pool *Pool) newConnectionProducer() { + defer pool.wg.Done() var connection Connection var err error @@ -494,6 +497,16 @@ func (pool *Pool) Close() { if !atomic.CompareAndSwapUint32(&pool.closed, 0, 1) { return } + //close connection in ready. after we drain readyConnection channel, newConnectionProducer will not block on + //readyConnection channel and when finding pool closed it will exit. + select { + case connection := <-pool.readyConnection: + pool.closeConn(connection.conn) + default: + } + + //wait newConnectionProducer exit for other cases except blocking on readyConnection channel. + pool.wg.Wait() //close idle connections pool.synchro.Lock() for _, connection := range pool.synchro.idleConnections { @@ -502,10 +515,4 @@ func (pool *Pool) Close() { } pool.synchro.idleConnections = nil pool.synchro.Unlock() - //close connection in ready - select { - case connection := <-pool.readyConnection: - pool.closeConn(connection.conn) - default: - } } From 28bf4b4b331256b784c61c746fa479470509d8ab Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Mon, 19 Jun 2023 15:04:40 +0800 Subject: [PATCH 5/7] Update resp.go client: fix ci --- client/resp.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/resp.go b/client/resp.go index dfdfea3b7..ad72deff6 100644 --- a/client/resp.go +++ b/client/resp.go @@ -344,7 +344,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) { rawPkgLen := len(result.RawPkg) result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg) if err != nil { - return + return err } data = result.RawPkg[rawPkgLen:] @@ -361,7 +361,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) { err = ErrMalformPacket } - return + return err } if result.Fields[i] == nil { @@ -369,7 +369,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) { } err = result.Fields[i].Parse(data) if err != nil { - return + return err } result.FieldNames[hack.String(result.Fields[i].Name)] = i @@ -385,7 +385,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) { rawPkgLen := len(result.RawPkg) result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg) if err != nil { - return + return err } data = result.RawPkg[rawPkgLen:] @@ -434,7 +434,7 @@ func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb S for { data, err = c.ReadPacketReuseMem(data[:0]) if err != nil { - return + return err } // EOF Packet From 76c71d8ddefea7eba7dd8d06bae1095f6cf07d6a Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Tue, 20 Jun 2023 11:19:32 +0800 Subject: [PATCH 6/7] client: replace pool.closed with ctx.Err --- client/pool.go | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/client/pool.go b/client/pool.go index b67ec5470..2b60b9f54 100644 --- a/client/pool.go +++ b/client/pool.go @@ -5,7 +5,6 @@ import ( "math" "math/rand" "sync" - "sync/atomic" "time" "github.com/pingcap/errors" @@ -43,7 +42,8 @@ type ( } readyConnection chan Connection - closed uint32 + ctx context.Context + cancel context.CancelFunc wg sync.WaitGroup } @@ -119,6 +119,8 @@ func NewPool( readyConnection: make(chan Connection), } + pool.ctx, pool.cancel = context.WithCancel(context.Background()) + pool.synchro.idleConnections = make([]Connection, 0, pool.maxIdle) pool.wg.Add(1) @@ -147,7 +149,7 @@ func (pool *Pool) GetStats(stats *ConnectionStats) { // GetConn returns connection from the pool or create new func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) { for { - if atomic.LoadUint32(&pool.closed) == 1 { + if pool.ctx.Err() != nil { return nil, errors.Errorf("failed get conn, pool closed") } connection, err := pool.getConnection(ctx) @@ -236,9 +238,6 @@ func (pool *Pool) newConnectionProducer() { var err error for { - if atomic.LoadUint32(&pool.closed) == 1 { - return - } connection.conn = nil pool.synchro.Lock() @@ -267,8 +266,12 @@ func (pool *Pool) newConnectionProducer() { continue } } - - pool.readyConnection <- connection + select { + case pool.readyConnection <- connection: + case <-pool.ctx.Done(): + pool.closeConn(connection.conn) + return + } } } @@ -309,7 +312,7 @@ func (pool *Pool) closeOldIdleConnections() { ticker := time.NewTicker(5 * time.Second) for range ticker.C { - if atomic.LoadUint32(&pool.closed) == 1 { + if pool.ctx.Err() != nil { return } toPing = pool.getOldIdleConnections(toPing[:0]) @@ -493,19 +496,8 @@ func (pool *Pool) ping(conn *Conn) error { // Close only shutdown idle connections. we couldn't control the connection which not in the pool. // So before call Close, Call PutConn to put all connections that in use back to connection pool first. func (pool *Pool) Close() { - //already closed, return. - if !atomic.CompareAndSwapUint32(&pool.closed, 0, 1) { - return - } - //close connection in ready. after we drain readyConnection channel, newConnectionProducer will not block on - //readyConnection channel and when finding pool closed it will exit. - select { - case connection := <-pool.readyConnection: - pool.closeConn(connection.conn) - default: - } - - //wait newConnectionProducer exit for other cases except blocking on readyConnection channel. + pool.cancel() + //wait newConnectionProducer exit. pool.wg.Wait() //close idle connections pool.synchro.Lock() From 5af6a308202b219aa32fb7c47834fe95451357e7 Mon Sep 17 00:00:00 2001 From: Hao <1358137595@qq.com> Date: Thu, 22 Jun 2023 18:13:58 +0800 Subject: [PATCH 7/7] client: close background groutine when client pool closed --- client/pool.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/client/pool.go b/client/pool.go index 2b60b9f54..315307c5a 100644 --- a/client/pool.go +++ b/client/pool.go @@ -131,6 +131,7 @@ func NewPool( pool.startNewConnections(pool.minAlive) } + pool.wg.Add(1) go pool.closeOldIdleConnections() return pool @@ -307,22 +308,25 @@ func (pool *Pool) getIdleConnectionUnsafe() Connection { } func (pool *Pool) closeOldIdleConnections() { + defer pool.wg.Done() var toPing []Connection ticker := time.NewTicker(5 * time.Second) - for range ticker.C { - if pool.ctx.Err() != nil { + for { + select { + case <-pool.ctx.Done(): return - } - toPing = pool.getOldIdleConnections(toPing[:0]) - if len(toPing) == 0 { - continue - } - pool.recheckConnections(toPing) + case <-ticker.C: + toPing = pool.getOldIdleConnections(toPing[:0]) + if len(toPing) == 0 { + continue + } + pool.recheckConnections(toPing) - if !pool.spawnConnectionsIfNeeded() { - pool.closeIdleConnectionsIfCan() + if !pool.spawnConnectionsIfNeeded() { + pool.closeIdleConnectionsIfCan() + } } } }