From f5c1d887387b28a3ef5ff80bcceb3fe82d84ed86 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Sun, 27 Aug 2023 16:45:41 +0900 Subject: [PATCH 01/10] feature - parse extra data --- replication/row_event.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/replication/row_event.go b/replication/row_event.go index df3f0380a..072931a41 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -879,6 +879,12 @@ type RowsEvent struct { // if version == 2 ExtraData []byte + NdbLength uint16 + NdbFormat []byte + NdbData []byte + PartitionId uint16 + SourceParitionId uint16 + // lenenc_int ColumnCount uint64 @@ -957,6 +963,25 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { pos += 2 e.ExtraData = data[pos : pos+int(dataLen-2)] + if len(e.ExtraData) > 2 { + extraDataType := binary.LittleEndian.Uint16(e.ExtraData[:2]) + switch extraDataType { + case 0: + e.NdbLength = binary.LittleEndian.Uint16(e.ExtraData[2:3]) + e.NdbFormat = e.ExtraData[3:4] + var rangeData int = int(e.NdbLength) - 3 + e.NdbData = e.ExtraData[4:rangeData] + case 1: + if e.eventType == PARTIAL_UPDATE_ROWS_EVENT { + e.PartitionId = binary.LittleEndian.Uint16(e.ExtraData[:3]) + e.SourceParitionId = binary.LittleEndian.Uint16(e.ExtraData[3:5]) + }else { + e.PartitionId = binary.LittleEndian.Uint16(e.ExtraData[:3]) + } + default : + + } + } pos += int(dataLen - 2) } From 4f572c69e0dc1642467a470abd04a03f83989fcb Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Wed, 30 Aug 2023 17:38:47 +0900 Subject: [PATCH 02/10] fixed - parse extradata --- replication/row_event.go | 52 +++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 072931a41..9f5a7cd3b 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -878,8 +878,9 @@ type RowsEvent struct { Flags uint16 // if version == 2 + DataLen uint16 ExtraData []byte - NdbLength uint16 + NdbLength uint8 NdbFormat []byte NdbData []byte PartitionId uint16 @@ -961,28 +962,10 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { if e.Version == 2 { dataLen := binary.LittleEndian.Uint16(data[pos:]) pos += 2 - - e.ExtraData = data[pos : pos+int(dataLen-2)] - if len(e.ExtraData) > 2 { - extraDataType := binary.LittleEndian.Uint16(e.ExtraData[:2]) - switch extraDataType { - case 0: - e.NdbLength = binary.LittleEndian.Uint16(e.ExtraData[2:3]) - e.NdbFormat = e.ExtraData[3:4] - var rangeData int = int(e.NdbLength) - 3 - e.NdbData = e.ExtraData[4:rangeData] - case 1: - if e.eventType == PARTIAL_UPDATE_ROWS_EVENT { - e.PartitionId = binary.LittleEndian.Uint16(e.ExtraData[:3]) - e.SourceParitionId = binary.LittleEndian.Uint16(e.ExtraData[3:5]) - }else { - e.PartitionId = binary.LittleEndian.Uint16(e.ExtraData[:3]) - } - default : - - } + if dataLen > 2 { + e.decodeExtraData(pos,data) } - pos += int(dataLen - 2) + pos += int(e.DataLen - 2) } var n int @@ -1010,6 +993,31 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { return pos, nil } +func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) { + extraDataType := uint8(data[pos]) + pos +=1 + switch extraDataType { + case 0: + e.NdbLength = uint8(data[pos]) + pos +=1 + e.NdbFormat = data[pos:] + pos +=1 + e.NdbData = data[pos:] + pos += int(e.NdbLength-2) + case 1: + if e.eventType == PARTIAL_UPDATE_ROWS_EVENT { + e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) + pos +=2 + e.SourceParitionId = binary.LittleEndian.Uint16(data[pos:]) + pos +=2 + }else { + e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) + pos +=2 + } + } + return nil +} + func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { // Rows_log_event::print_verbose() From ba1f8e3c81aa2c13f5f6a1384f6491de5c866736 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Thu, 31 Aug 2023 22:56:58 +0900 Subject: [PATCH 03/10] fix typos and add comments --- replication/row_event.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 9f5a7cd3b..9acdd0c5e 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -878,8 +878,7 @@ type RowsEvent struct { Flags uint16 // if version == 2 - DataLen uint16 - ExtraData []byte + // Use when DataLen value is greater than 2 NdbLength uint8 NdbFormat []byte NdbData []byte @@ -965,7 +964,7 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { if dataLen > 2 { e.decodeExtraData(pos,data) } - pos += int(e.DataLen - 2) + pos += int(dataLen - 2) } var n int @@ -1005,7 +1004,7 @@ func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) { e.NdbData = data[pos:] pos += int(e.NdbLength-2) case 1: - if e.eventType == PARTIAL_UPDATE_ROWS_EVENT { + if e.eventType == UPDATE_ROWS_EVENTv2 { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) pos +=2 e.SourceParitionId = binary.LittleEndian.Uint16(data[pos:]) From 5fd15fe45f74dce2ab7eca4fb4d12cb5887221ad Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Thu, 31 Aug 2023 23:31:12 +0900 Subject: [PATCH 04/10] fix according to coding style --- replication/row_event.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 9acdd0c5e..5516cdb1c 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -962,7 +962,10 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { dataLen := binary.LittleEndian.Uint16(data[pos:]) pos += 2 if dataLen > 2 { - e.decodeExtraData(pos,data) + err := e.decodeExtraData(pos,data) + if err != nil { + return 0, err + } } pos += int(dataLen - 2) } @@ -993,16 +996,15 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { } func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) { - extraDataType := uint8(data[pos]) + extraDataType := data[pos] pos +=1 switch extraDataType { case 0: - e.NdbLength = uint8(data[pos]) + e.NdbLength = data[pos] pos +=1 e.NdbFormat = data[pos:] pos +=1 - e.NdbData = data[pos:] - pos += int(e.NdbLength-2) + e.NdbData = data[pos:e.NdbLength-2] case 1: if e.eventType == UPDATE_ROWS_EVENTv2 { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) From 3a82f8ca3073cbd00da90043bb78af38b80b0ff4 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Sun, 3 Sep 2023 14:30:09 +0900 Subject: [PATCH 05/10] Fix typos and change to formatting --- replication/row_event.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 5516cdb1c..3105ac4fd 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -879,12 +879,12 @@ type RowsEvent struct { // if version == 2 // Use when DataLen value is greater than 2 - NdbLength uint8 NdbFormat []byte - NdbData []byte - PartitionId uint16 - SourceParitionId uint16 + NdbData []byte + NdbLength byte + PartitionId uint16 + SourceParitionId uint16 // lenenc_int ColumnCount uint64 @@ -962,7 +962,7 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { dataLen := binary.LittleEndian.Uint16(data[pos:]) pos += 2 if dataLen > 2 { - err := e.decodeExtraData(pos,data) + err := e.decodeExtraData(pos, data) if err != nil { return 0, err } @@ -997,23 +997,21 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) { extraDataType := data[pos] - pos +=1 + pos += 1 switch extraDataType { case 0: e.NdbLength = data[pos] - pos +=1 + pos += 1 e.NdbFormat = data[pos:] - pos +=1 - e.NdbData = data[pos:e.NdbLength-2] + pos += 1 + e.NdbData = data[pos : e.NdbLength-2] case 1: if e.eventType == UPDATE_ROWS_EVENTv2 { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) - pos +=2 + pos += 2 e.SourceParitionId = binary.LittleEndian.Uint16(data[pos:]) - pos +=2 - }else { + } else { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) - pos +=2 } } return nil From 2a46034223b4ad79b021e3bc7f1223e42f499ab6 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Tue, 5 Sep 2023 04:02:49 +0900 Subject: [PATCH 06/10] Fix Incorrect Byte Parsing in NDB Info from Extra Data --- replication/row_event.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 3105ac4fd..96f2ecc90 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -879,12 +879,11 @@ type RowsEvent struct { // if version == 2 // Use when DataLen value is greater than 2 - NdbFormat []byte + NdbFormat byte NdbData []byte - NdbLength byte - PartitionId uint16 - SourceParitionId uint16 + PartitionId uint16 + SourcePartitionId uint16 // lenenc_int ColumnCount uint64 @@ -1000,16 +999,16 @@ func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) { pos += 1 switch extraDataType { case 0: - e.NdbLength = data[pos] + var ndbLength int = int(data[pos]) pos += 1 - e.NdbFormat = data[pos:] + e.NdbFormat = data[pos] pos += 1 - e.NdbData = data[pos : e.NdbLength-2] + e.NdbData = data[pos : pos+ndbLength-2] case 1: if e.eventType == UPDATE_ROWS_EVENTv2 { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) pos += 2 - e.SourceParitionId = binary.LittleEndian.Uint16(data[pos:]) + e.SourcePartitionId = binary.LittleEndian.Uint16(data[pos:]) } else { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) } @@ -1774,6 +1773,7 @@ func (e *RowsEvent) Dump(w io.Writer) { fmt.Fprintf(w, "TableID: %d\n", e.TableID) fmt.Fprintf(w, "Flags: %d\n", e.Flags) fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) + fmt.Fprintf(w, "NDB data: %s\n", e.NdbData) fmt.Fprintf(w, "Values:\n") for _, rows := range e.Rows { From 7a6d9189c0ab5dc1ff82937c3ae743167c0520df Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Tue, 5 Sep 2023 04:03:27 +0900 Subject: [PATCH 07/10] Add Test Code to Validate 'extradata' Parsing in 'rows event' --- replication/row_event_test.go | 120 ++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/replication/row_event_test.go b/replication/row_event_test.go index 57ebdf623..33d38d74d 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1056,6 +1056,126 @@ func TestTableMapOptMetaVisibility(t *testing.T) { } } +func TestRowsDataExtraData(t *testing.T) { + // Only after mysql 8.0.16 version can be parsed from extradata to 'partition info' and 'ndb info' + testcases := []struct { + data []byte + tableData []byte + eventType EventType + expectPartitionId uint16 + expectSourcePartitionId uint16 + expectNdbFormat byte + expectNdbData []byte + }{ + /* + mysql-cluster 8.0.32 + + +-------+------+------+-----+---------+-------+ + | Field | Type | Null | Key | Default | Extra | + +-------+------+------+-----+---------+-------+ + | p | int | NO | PRI | NULL | | + | c | int | YES | UNI | NULL | | + +-------+------+------+-----+---------+-------+ + + CREATE TABLE t ( + p INT PRIMARY KEY, + c INT, + UNIQUE KEY u (c) + ) ENGINE NDB; + + INSERT INTO t VALUES (1,1), (2,2), (3,3), (4,4), (5,5); + */ + { + data: []byte("s\x00\x00\x00\x00\x00\x01\x00\x0f\x00\x00\f\x00\x01\x00\x00\x04\x80\x00\x04\x00\x00\x00\x02\xff\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00\x03\x00\x00\x00\x03\x00\x00\x00\x00\x05\x00\x00\x00\x05\x00\x00\x00"), + tableData: []byte("s\x00\x00\x00\x00\x00\x01\x00\abdteste\x00\x01t\x00\x02\x03\x03\x00\x02\x01\x01\x00"), + eventType: WRITE_ROWS_EVENTv2, + expectPartitionId: 0x0, + expectSourcePartitionId: 0x0, + expectNdbFormat: 0x0, + expectNdbData: []byte("\x01\x00\x00\x04\x80\x00\x04\x00\x00\x00"), + }, + /* + mysql 8.0.16 + + +-------+------+------+-----+---------+-------+ + | Field | Type | Null | Key | Default | Extra | + +-------+------+------+-----+---------+-------+ + | id | int | YES | | NULL | | + +-------+------+------+-----+---------+-------+ + + CREATE TABLE test (id INTEGER) + PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (1), + PARTITION p1 VALUES LESS THAN (2), + PARTITION p2 VALUES LESS THAN (3), + PARTITION p3 VALUES LESS THAN (4), + PARTITION p4 VALUES LESS THAN (5) + ); + + INSERT INTO test (id) VALUES(3); + UPDATE test set id = 1 WHERE id = 3; + */ + { + data: []byte("p\x03\x00\x00\x00\x00\x01\x00\x05\x00\x01\x03\x00\x01\xff\x00\x03\x00\x00\x00"), + tableData: []byte("p\x03\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01\x01\x01\x00"), + eventType: WRITE_ROWS_EVENTv2, + expectPartitionId: 0x3, + expectSourcePartitionId: 0x0, + expectNdbFormat: 0x0, + expectNdbData: []byte(nil), + }, + { + data: []byte("p\x03\x00\x00\x00\x00\x01\x00\a\x00\x01\x01\x00\x03\x00\x01\xff\xff\x00\x03\x00\x00\x00\x00\x01\x00\x00\x00"), + tableData: []byte("p\x03\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01\x01\x01\x00"), + eventType: UPDATE_ROWS_EVENTv2, + expectPartitionId: 0x1, + expectSourcePartitionId: 0x3, + expectNdbFormat: 0x0, + expectNdbData: []byte(nil), + }, + // mysql 5.7 and mariadb 14(15) does not surpot extra data + { + data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xfe\x03\x00\x00\x00"), + tableData: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01"), + eventType: WRITE_ROWS_EVENTv2, + expectPartitionId: 0x0, + expectSourcePartitionId: 0x0, + expectNdbFormat: 0x0, + expectNdbData: []byte(nil), + }, + { + data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xff\xfe\x03\x00\x00\x00\xfe\x01\x00\x00\x00"), + tableData: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01"), + eventType: UPDATE_ROWS_EVENTv2, + expectPartitionId: 0x0, + expectSourcePartitionId: 0x0, + expectNdbFormat: 0x0, + expectNdbData: []byte(nil), + }, + } + + for _, tc := range testcases { + tableMapEvent := new(TableMapEvent) + tableMapEvent.tableIDSize = 6 + err := tableMapEvent.Decode(tc.tableData) + require.NoError(t, err) + + rowsEvent := new(RowsEvent) + rowsEvent.tableIDSize = 6 + rowsEvent.tables = make(map[uint64]*TableMapEvent) + rowsEvent.tables[tableMapEvent.TableID] = tableMapEvent + rowsEvent.Version = 2 + rowsEvent.eventType = tc.eventType + + err = rowsEvent.Decode(tc.data) + require.NoError(t, err) + require.Equal(t, tc.expectPartitionId, rowsEvent.PartitionId) + require.Equal(t, tc.expectSourcePartitionId, rowsEvent.SourcePartitionId) + require.Equal(t, tc.expectNdbFormat, rowsEvent.NdbFormat) + require.Equal(t, tc.expectNdbData, rowsEvent.NdbData) + } +} + func TestTableMapHelperMaps(t *testing.T) { /* CREATE TABLE `_types` ( From e200b75f79f603782975bf6a727a9a0a66bff11a Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Wed, 6 Sep 2023 15:18:54 +0900 Subject: [PATCH 08/10] fixed conding style --- replication/const.go | 5 +++++ replication/row_event.go | 9 +++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/replication/const.go b/replication/const.go index 9c98d9323..9ebd80b37 100644 --- a/replication/const.go +++ b/replication/const.go @@ -250,3 +250,8 @@ const ( LAST_INSERT_ID INSERT_ID ) + +const ( + ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota + ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION +) diff --git a/replication/row_event.go b/replication/row_event.go index 96f2ecc90..b6494eed3 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -961,7 +961,7 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { dataLen := binary.LittleEndian.Uint16(data[pos:]) pos += 2 if dataLen > 2 { - err := e.decodeExtraData(pos, data) + err := e.decodeExtraData(data[pos:]) if err != nil { return 0, err } @@ -994,17 +994,18 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { return pos, nil } -func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) { +func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) { + pos := 0 extraDataType := data[pos] pos += 1 switch extraDataType { - case 0: + case ENUM_EXTRA_ROW_INFO_TYPECODE_NDB: var ndbLength int = int(data[pos]) pos += 1 e.NdbFormat = data[pos] pos += 1 e.NdbData = data[pos : pos+ndbLength-2] - case 1: + case ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION: if e.eventType == UPDATE_ROWS_EVENTv2 { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) pos += 2 From 948ae6ae2fc541cdee6ae7d5ccaa41d24c952134 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Wed, 6 Sep 2023 15:19:16 +0900 Subject: [PATCH 09/10] Add event type to rows event data parsing for partition ID --- replication/row_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/row_event.go b/replication/row_event.go index b6494eed3..2910ff2d7 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1006,7 +1006,7 @@ func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) { pos += 1 e.NdbData = data[pos : pos+ndbLength-2] case ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION: - if e.eventType == UPDATE_ROWS_EVENTv2 { + if e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == PARTIAL_UPDATE_ROWS_EVENT { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) pos += 2 e.SourcePartitionId = binary.LittleEndian.Uint16(data[pos:]) From c69d1699747f8b60c70badddfd7f83f729ae3fa0 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Thu, 7 Sep 2023 22:01:07 +0900 Subject: [PATCH 10/10] Add event type to check partition Id in rows event --- replication/row_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/row_event.go b/replication/row_event.go index 2910ff2d7..87fe49dbf 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1006,7 +1006,7 @@ func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) { pos += 1 e.NdbData = data[pos : pos+ndbLength-2] case ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION: - if e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == PARTIAL_UPDATE_ROWS_EVENT { + if e.eventType == UPDATE_ROWS_EVENTv1 || e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == PARTIAL_UPDATE_ROWS_EVENT { e.PartitionId = binary.LittleEndian.Uint16(data[pos:]) pos += 2 e.SourcePartitionId = binary.LittleEndian.Uint16(data[pos:])