From 3a00487a442dce822212ca41b2d77c654093727e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 2 Sep 2019 09:57:34 +0800 Subject: [PATCH 1/5] refine usage of MaxReconnectAttempts in BinlogSyncer --- replication/binlogsyncer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 51caaa15c..2547ab542 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -83,7 +83,8 @@ type BinlogSyncerConfig struct { // read timeout ReadTimeout time.Duration - // maximum number of attempts to re-establish a broken connection + // maximum number of attempts to re-establish a broken connection, zero means no retry, any negative number + // means infinite retry. MaxReconnectAttempts int // Only works when MySQL/MariaDB variable binlog_checksum=CRC32. @@ -646,6 +647,11 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } for { + if b.cfg.MaxReconnectAttempts >= 0 && b.retryCount >= b.cfg.MaxReconnectAttempts { + log.Errorf("retry sync exceeded max retries (%d)", b.cfg.MaxReconnectAttempts) + s.closeWithError(err) + return + } select { case <-b.ctx.Done(): s.close() @@ -653,12 +659,6 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { case <-time.After(time.Second): b.retryCount++ if err = b.retrySync(); err != nil { - if b.cfg.MaxReconnectAttempts > 0 && b.retryCount >= b.cfg.MaxReconnectAttempts { - log.Errorf("retry sync err: %v, exceeded max retries (%d)", err, b.cfg.MaxReconnectAttempts) - s.closeWithError(err) - return - } - log.Errorf("retry sync err: %v, wait 1s and retry again", err) continue } From eba6a01b2b759ac218aec081d70c52f3069d03e6 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 2 Sep 2019 10:29:38 +0800 Subject: [PATCH 2/5] fix ci --- replication/replication_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/replication/replication_test.go b/replication/replication_test.go index 6f3e2fd71..fbe9c801a 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -285,13 +285,14 @@ func (t *testSyncerSuite) setupTest(c *C, flavor string) { } cfg := BinlogSyncerConfig{ - ServerID: 100, - Flavor: flavor, - Host: *testHost, - Port: port, - User: "root", - Password: "", - UseDecimal: true, + ServerID: 100, + Flavor: flavor, + Host: *testHost, + Port: port, + User: "root", + Password: "", + UseDecimal: true, + MaxReconnectAttempts: 1, } t.b = NewBinlogSyncer(cfg) From ff2bec2f2645da0c2ababe8698d573decbd49002 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 2 Sep 2019 10:53:37 +0800 Subject: [PATCH 3/5] add DisableRetrySync, keep old logic --- replication/binlogsyncer.go | 17 +++++++++++++---- replication/replication_test.go | 33 ++++++++++++++++----------------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 2547ab542..22a64b307 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -83,10 +83,13 @@ type BinlogSyncerConfig struct { // read timeout ReadTimeout time.Duration - // maximum number of attempts to re-establish a broken connection, zero means no retry, any negative number - // means infinite retry. + // maximum number of attempts to re-establish a broken connection, zero means infinite retry + // will not work if DisableRetrySync is true MaxReconnectAttempts int + // whether disable re-sync for broken connection + DisableRetrySync bool + // Only works when MySQL/MariaDB variable binlog_checksum=CRC32. // For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 . // https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum @@ -647,8 +650,8 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } for { - if b.cfg.MaxReconnectAttempts >= 0 && b.retryCount >= b.cfg.MaxReconnectAttempts { - log.Errorf("retry sync exceeded max retries (%d)", b.cfg.MaxReconnectAttempts) + if b.cfg.DisableRetrySync { + log.Warn("retry sync is disabled") s.closeWithError(err) return } @@ -659,6 +662,12 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { case <-time.After(time.Second): b.retryCount++ if err = b.retrySync(); err != nil { + if b.cfg.MaxReconnectAttempts > 0 && b.retryCount >= b.cfg.MaxReconnectAttempts { + log.Errorf("retry sync err: %v, exceeded max retries (%d)", err, b.cfg.MaxReconnectAttempts) + s.closeWithError(err) + return + } + log.Errorf("retry sync err: %v, wait 1s and retry again", err) continue } diff --git a/replication/replication_test.go b/replication/replication_test.go index fbe9c801a..e93e87abb 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -158,8 +158,8 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { t.testExecute(c, "DROP TABLE IF EXISTS test_json_v2") str = `CREATE TABLE test_json_v2 ( - id INT, - c JSON, + id INT, + c JSON, PRIMARY KEY (id) ) ENGINE=InnoDB` @@ -236,20 +236,20 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { // Must allow zero time. t.testExecute(c, `SET sql_mode=''`) str = `CREATE TABLE test_parse_time ( - a1 DATETIME, - a2 DATETIME(3), - a3 DATETIME(6), - b1 TIMESTAMP, - b2 TIMESTAMP(3) , + a1 DATETIME, + a2 DATETIME(3), + a3 DATETIME(6), + b1 TIMESTAMP, + b2 TIMESTAMP(3) , b3 TIMESTAMP(6))` t.testExecute(c, str) t.testExecute(c, `INSERT INTO test_parse_time VALUES - ("2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", + ("2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456"), ("0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000"), - ("2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", + ("2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456")`) t.wg.Wait() @@ -285,14 +285,13 @@ func (t *testSyncerSuite) setupTest(c *C, flavor string) { } cfg := BinlogSyncerConfig{ - ServerID: 100, - Flavor: flavor, - Host: *testHost, - Port: port, - User: "root", - Password: "", - UseDecimal: true, - MaxReconnectAttempts: 1, + ServerID: 100, + Flavor: flavor, + Host: *testHost, + Port: port, + User: "root", + Password: "", + UseDecimal: true, } t.b = NewBinlogSyncer(cfg) From 260d9c5376d250e99a62012baaefeb0906da32d6 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 2 Sep 2019 10:55:37 +0800 Subject: [PATCH 4/5] revert test file --- replication/replication_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/replication/replication_test.go b/replication/replication_test.go index e93e87abb..6f3e2fd71 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -158,8 +158,8 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { t.testExecute(c, "DROP TABLE IF EXISTS test_json_v2") str = `CREATE TABLE test_json_v2 ( - id INT, - c JSON, + id INT, + c JSON, PRIMARY KEY (id) ) ENGINE=InnoDB` @@ -236,20 +236,20 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { // Must allow zero time. t.testExecute(c, `SET sql_mode=''`) str = `CREATE TABLE test_parse_time ( - a1 DATETIME, - a2 DATETIME(3), - a3 DATETIME(6), - b1 TIMESTAMP, - b2 TIMESTAMP(3) , + a1 DATETIME, + a2 DATETIME(3), + a3 DATETIME(6), + b1 TIMESTAMP, + b2 TIMESTAMP(3) , b3 TIMESTAMP(6))` t.testExecute(c, str) t.testExecute(c, `INSERT INTO test_parse_time VALUES - ("2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", + ("2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456"), ("0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000"), - ("2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", + ("2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456")`) t.wg.Wait() From 1ca3b58eb635d68df358405fd0694778008977b2 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 2 Sep 2019 11:00:39 +0800 Subject: [PATCH 5/5] refine code and comment --- replication/binlogsyncer.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 22a64b307..3a5a60643 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -83,11 +83,11 @@ type BinlogSyncerConfig struct { // read timeout ReadTimeout time.Duration - // maximum number of attempts to re-establish a broken connection, zero means infinite retry - // will not work if DisableRetrySync is true + // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry. + // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int - // whether disable re-sync for broken connection + // whether disable re-sync for broken connection DisableRetrySync bool // Only works when MySQL/MariaDB variable binlog_checksum=CRC32. @@ -649,12 +649,13 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { return } + if b.cfg.DisableRetrySync { + log.Warn("retry sync is disabled") + s.closeWithError(err) + return + } + for { - if b.cfg.DisableRetrySync { - log.Warn("retry sync is disabled") - s.closeWithError(err) - return - } select { case <-b.ctx.Done(): s.close()