From 6a17fa5a160ff856f7448ca23882b6bff0482f7f Mon Sep 17 00:00:00 2001 From: equnchen Date: Wed, 10 Apr 2024 23:48:43 +0800 Subject: [PATCH 1/3] fix: (*RowsEvent).handleUnsigned() panic --- canal/rows.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/canal/rows.go b/canal/rows.go index 9e5df3981..9f618a9fb 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -53,6 +53,13 @@ func (r *RowsEvent) handleUnsigned() { for i := 0; i < len(r.Rows); i++ { for _, columnIdx := range r.Table.UnsignedColumns { + // When Canal.delay is big, after call Canal.StartFromGTID(), + // we will get the newest table schema (for example, after DDL "alter table add column xxx unsigned..."), + // but the binlog data can be very old (before DDL "alter table add column xxx unsigned..."), + // results in max(columnIdx) >= len(r.Rows[i]), then r.Rows[i][columnIdx] panic. + if columnIdx >= len(r.Rows[i]) { + continue + } switch value := r.Rows[i][columnIdx].(type) { case int8: r.Rows[i][columnIdx] = uint8(value) From d279b6b87c1115959f0aefb8f2069a34b422bde9 Mon Sep 17 00:00:00 2001 From: equnchen Date: Thu, 11 Apr 2024 19:32:13 +0800 Subject: [PATCH 2/3] add unit test: TestRowsEvent_handleUnsigned() --- canal/rows_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 canal/rows_test.go diff --git a/canal/rows_test.go b/canal/rows_test.go new file mode 100644 index 000000000..c5673929e --- /dev/null +++ b/canal/rows_test.go @@ -0,0 +1,61 @@ +package canal + +import ( + "github.com/go-mysql-org/go-mysql/replication" + "github.com/go-mysql-org/go-mysql/schema" + "reflect" + "testing" +) + +func TestRowsEvent_handleUnsigned(t *testing.T) { + type fields struct { + Table *schema.Table + Action string + Rows [][]interface{} + Header *replication.EventHeader + } + tests := []struct { + name string + fields fields + wantRows [][]interface{} + }{ + { + name: "rows_event_handle_unsigned", + fields: fields{ + Table: &schema.Table{ + // columns 1,3,5,7,9 should be converted from signed to unsigned, + // column 10 is out of range and should be ignored, don't panic. + UnsignedColumns: []int{1, 3, 5, 7, 9, 10}, + }, + Rows: [][]interface{}{{ + int8(8), int8(8), + int16(16), int16(16), + int32(32), int32(32), + int64(64), int64(64), + int(128), int(128)}, + }, + }, + wantRows: [][]interface{}{{ + int8(8), uint8(8), + int16(16), uint16(16), + int32(32), uint32(32), + int64(64), uint64(64), + int(128), uint(128)}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &RowsEvent{ + Table: tt.fields.Table, + Action: tt.fields.Action, + Rows: tt.fields.Rows, + Header: tt.fields.Header, + } + r.handleUnsigned() + if !reflect.DeepEqual(tt.fields.Rows, tt.wantRows) { + t.Errorf("rows=%+v, wantRows=%+v, not equal", tt.fields.Rows, tt.wantRows) + } + }) + } +} From d0f803fef7daaf0621e74ce2d1775660c22e39ad Mon Sep 17 00:00:00 2001 From: equnchen Date: Thu, 11 Apr 2024 21:37:34 +0800 Subject: [PATCH 3/3] import require --- canal/rows_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/canal/rows_test.go b/canal/rows_test.go index c5673929e..dfbce0b39 100644 --- a/canal/rows_test.go +++ b/canal/rows_test.go @@ -1,10 +1,11 @@ package canal import ( + "testing" + "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" - "reflect" - "testing" + "github.com/stretchr/testify/require" ) func TestRowsEvent_handleUnsigned(t *testing.T) { @@ -53,9 +54,7 @@ func TestRowsEvent_handleUnsigned(t *testing.T) { Header: tt.fields.Header, } r.handleUnsigned() - if !reflect.DeepEqual(tt.fields.Rows, tt.wantRows) { - t.Errorf("rows=%+v, wantRows=%+v, not equal", tt.fields.Rows, tt.wantRows) - } + require.Equal(t, tt.fields.Rows, tt.wantRows) }) } }