From 0be0f1f14b462d33ca0da613ea0d6cb7c3953747 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Fri, 6 Oct 2023 14:24:13 +0800 Subject: [PATCH 1/5] handle subevents in transaction payload event --- canal/sync.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/canal/sync.go b/canal/sync.go index 5523d9fe4..8fe566d04 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -106,6 +106,23 @@ func (c *Canal) runSyncBinlog() error { } } continue + case *replication.TransactionPayloadEvent: + // handle subevent row by row + ev := ev.Event.(*replication.TransactionPayloadEvent) + for _, subEvent := range ev.Events { + err = c.handleRowsEvent(subEvent) + if err != nil { + e := errors.Cause(err) + // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal + if e != ErrExcludedTable && + e != schema.ErrTableNotExist && + e != schema.ErrMissingTableMeta { + c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) + } + } + } + continue case *replication.XIDEvent: savePos = true // try to save the position later From 130a049cfa4f484c6bdd16d3a9b4c5880edd8673 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Sun, 8 Oct 2023 21:56:29 +0800 Subject: [PATCH 2/5] encap duplicate code --- canal/sync.go | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 8fe566d04..69f58c824 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -96,14 +96,8 @@ func (c *Canal) runSyncBinlog() error { // we only focus row based event err = c.handleRowsEvent(ev) if err != nil { - e := errors.Cause(err) - // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) - return errors.Trace(err) - } + c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) } continue case *replication.TransactionPayloadEvent: @@ -112,14 +106,8 @@ func (c *Canal) runSyncBinlog() error { for _, subEvent := range ev.Events { err = c.handleRowsEvent(subEvent) if err != nil { - e := errors.Cause(err) - // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) - return errors.Trace(err) - } + c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) } } continue @@ -249,10 +237,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { ev := e.Event.(*replication.RowsEvent) // Caveat: table may be altered at runtime. - schema := string(ev.Table.Schema) - table := string(ev.Table.Table) + schemaName := string(ev.Table.Schema) + tableName := string(ev.Table.Table) - t, err := c.GetTable(schema, table) + t, err := c.GetTable(schemaName, tableName) if err != nil { return err } @@ -268,7 +256,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) - return c.eventHandler.OnRow(events) + err = c.eventHandler.OnRow(events) + if err != nil { + e := errors.Cause(err) + if e != ErrExcludedTable && + e != schema.ErrTableNotExist && + e != schema.ErrMissingTableMeta { + return err + } + } + + return nil } func (c *Canal) FlushBinlog() error { From 69234e2a2d0d83d2c47734c4a718a2ad0e79fdc4 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Mon, 9 Oct 2023 21:47:24 +0800 Subject: [PATCH 3/5] fix error handle --- canal/sync.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 69f58c824..ff2eccfd1 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -242,6 +242,12 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { t, err := c.GetTable(schemaName, tableName) if err != nil { + e := errors.Cause(err) + // ignore errors below + if e == ErrExcludedTable || e == schema.ErrTableNotExist || e == schema.ErrMissingTableMeta { + err = nil + } + return err } var action string @@ -256,17 +262,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) - err = c.eventHandler.OnRow(events) - if err != nil { - e := errors.Cause(err) - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - return err - } - } - - return nil + return c.eventHandler.OnRow(events) } func (c *Canal) FlushBinlog() error { From 34b8ce430641d0a47ee862b692d1cb9606370690 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Thu, 14 Dec 2023 23:35:12 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BD=BF=E7=94=A8json-iterator=E4=BC=98?= =?UTF-8?q?=E5=8C=96json=E5=BA=8F=E5=88=97=E5=8C=96=E6=B6=88=E8=80=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 +++ go.sum | 7 +++++++ replication/json_binary.go | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6c3ba1330..5da529ae3 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.3 + github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.1 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 @@ -21,6 +22,8 @@ require ( github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.3.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index aaa8f75b5..b172df079 100644 --- a/go.sum +++ b/go.sum @@ -13,10 +13,13 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -30,6 +33,10 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= diff --git a/replication/json_binary.go b/replication/json_binary.go index 7a8217181..74630d3f7 100644 --- a/replication/json_binary.go +++ b/replication/json_binary.go @@ -1,10 +1,10 @@ package replication import ( - "encoding/json" "fmt" "math" + json "github.com/json-iterator/go" "github.com/pingcap/errors" "github.com/siddontang/go/hack" From c474ffab596c2547558ad8dd2feaf2b68b63e148 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Wed, 27 Dec 2023 15:44:33 +0800 Subject: [PATCH 5/5] use go-json instead --- go.mod | 4 +--- go.sum | 9 ++------- replication/json_binary.go | 2 +- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 5da529ae3..de6c086b6 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/BurntSushi/toml v1.3.2 github.com/Masterminds/semver v1.5.0 github.com/go-sql-driver/mysql v1.7.1 + github.com/goccy/go-json v0.10.2 github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.3 - github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.1 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 @@ -22,8 +22,6 @@ require ( github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.3.1 // indirect - github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index b172df079..0c7e0fd61 100644 --- a/go.sum +++ b/go.sum @@ -13,13 +13,12 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -33,10 +32,6 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= diff --git a/replication/json_binary.go b/replication/json_binary.go index 74630d3f7..065dd6b2e 100644 --- a/replication/json_binary.go +++ b/replication/json_binary.go @@ -4,7 +4,7 @@ import ( "fmt" "math" - json "github.com/json-iterator/go" + "github.com/goccy/go-json" "github.com/pingcap/errors" "github.com/siddontang/go/hack"