From b2355538ca6ba724070f86770fccd37604caddc9 Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Mon, 27 Jun 2022 13:24:02 +0300 Subject: [PATCH 1/3] Add support for custom dialer in canal and binlog syncer --- canal/canal.go | 12 +++++++++++- canal/config.go | 8 ++++++++ client/conn.go | 4 ++++ replication/binlogsyncer.go | 17 ++++++++++++++--- 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 4d9676964..fd80079f4 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -427,6 +427,7 @@ func (c *Canal) prepareSyncer() error { TimestampStringLocation: c.cfg.TimestampStringLocation, TLSConfig: c.cfg.TLSConfig, Logger: c.cfg.Logger, + Dialer: c.cfg.Dialer, } if strings.Contains(c.cfg.Addr, "/") { @@ -451,6 +452,14 @@ func (c *Canal) prepareSyncer() error { return nil } +func (c *Canal) connect(options ...func(*client.Conn)) (*client.Conn, error) { + ctx, cancel := context.WithTimeout(c.ctx, time.Second*10) + defer cancel() + + return client.ConnectWithDialer(ctx, "", c.cfg.Addr, + c.cfg.User, c.cfg.Password, "", c.cfg.Dialer, options...) +} + // Execute a SQL func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) { c.connLock.Lock() @@ -461,10 +470,11 @@ func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err conn.SetTLSConfig(c.cfg.TLSConfig) }) } + retryNum := 3 for i := 0; i < retryNum; i++ { if c.conn == nil { - c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "", argF...) + c.conn, err = c.connect(argF...) if err != nil { return nil, errors.Trace(err) } diff --git a/canal/config.go b/canal/config.go index f87357e42..4f4a8ea53 100644 --- a/canal/config.go +++ b/canal/config.go @@ -4,10 +4,12 @@ import ( "crypto/tls" "io/ioutil" "math/rand" + "net" "os" "time" "github.com/BurntSushi/toml" + "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" "github.com/siddontang/go-log/log" @@ -91,6 +93,9 @@ type Config struct { //Set Logger Logger *log.Logger + + //Set Dialer + Dialer client.Dialer } func NewConfigWithFile(name string) (*Config, error) { @@ -132,5 +137,8 @@ func NewDefaultConfig() *Config { streamHandler, _ := log.NewStreamHandler(os.Stdout) c.Logger = log.NewDefault(streamHandler) + dialer := &net.Dialer{} + c.Dialer = dialer.DialContext + return c } diff --git a/client/conn.go b/client/conn.go index 5cbe30025..13ff6794d 100644 --- a/client/conn.go +++ b/client/conn.go @@ -78,6 +78,10 @@ type Dialer func(ctx context.Context, network, address string) (net.Conn, error) func ConnectWithDialer(ctx context.Context, network string, addr string, user string, password string, dbName string, dialer Dialer, options ...func(*Conn)) (*Conn, error) { c := new(Conn) + if network == "" { + network = getNetProto(addr) + } + var err error conn, err := dialer(ctx, network, addr) if err != nil { diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index aa5ac747a..196eb0870 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -112,6 +112,9 @@ type BinlogSyncerConfig struct { // Set Logger Logger *log.Logger + + // Set Dialer + Dialer client.Dialer } // BinlogSyncer syncs binlog event from server. @@ -149,6 +152,10 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { if cfg.ServerID == 0 { cfg.Logger.Fatal("can't use 0 as the server ID") } + if cfg.Dialer == nil { + dialer := &net.Dialer{} + cfg.Dialer = dialer.DialContext + } // Clear the Password to avoid outputing it in log. pass := cfg.Password @@ -864,9 +871,13 @@ func (b *BinlogSyncer) newConnection() (*client.Conn, error) { addr = b.cfg.Host } - return client.Connect(addr, b.cfg.User, b.cfg.Password, "", func(c *client.Conn) { - c.SetTLSConfig(b.cfg.TLSConfig) - }) + ctx, cancel := context.WithTimeout(b.ctx, time.Second*10) + defer cancel() + + return client.ConnectWithDialer(ctx, "", addr, b.cfg.User, b.cfg.Password, + "", b.cfg.Dialer, func(c *client.Conn) { + c.SetTLSConfig(b.cfg.TLSConfig) + }) } func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) { From d2aaa2a78f2ab8c5f4c329b36279b3f945b4e402 Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Mon, 27 Jun 2022 13:45:09 +0300 Subject: [PATCH 2/3] Hush golinter --- canal/canal.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index fd80079f4..fec6040d8 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -481,17 +481,17 @@ func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err } rr, err = c.conn.Execute(cmd, args...) - if err != nil && !mysql.ErrorEqual(err, mysql.ErrBadConn) { - return - } else if mysql.ErrorEqual(err, mysql.ErrBadConn) { - c.conn.Close() - c.conn = nil - continue - } else { - return + if err != nil { + if mysql.ErrorEqual(err, mysql.ErrBadConn) { + c.conn.Close() + c.conn = nil + continue + } + return nil, err } + break } - return + return rr, err } func (c *Canal) SyncedPosition() mysql.Position { From 2ac83fd1d4584557ece0b631aed282f099dc88fc Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Tue, 28 Jun 2022 16:08:48 +0300 Subject: [PATCH 3/3] Minor code refactoring --- client/conn.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/conn.go b/client/conn.go index 1893d3a0f..851f2003a 100644 --- a/client/conn.go +++ b/client/conn.go @@ -61,14 +61,12 @@ func getNetProto(addr string) string { // Connect to a MySQL server, addr can be ip:port, or a unix socket domain like /var/sock. // Accepts a series of configuration functions as a variadic argument. func Connect(addr string, user string, password string, dbName string, options ...func(*Conn)) (*Conn, error) { - proto := getNetProto(addr) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() dialer := &net.Dialer{} - return ConnectWithDialer(ctx, proto, addr, user, password, dbName, dialer.DialContext, options...) + return ConnectWithDialer(ctx, "", addr, user, password, dbName, dialer.DialContext, options...) } // Dialer connects to the address on the named network using the provided context.