From dd14348ca4a45a200144a361b88f368589e9b91b Mon Sep 17 00:00:00 2001 From: Aleksejs Sinicins Date: Thu, 4 May 2023 10:56:12 +0300 Subject: [PATCH 1/2] Add support for MariaDB compressed binlog events --- canal/sync.go | 6 +++--- replication/const.go | 11 +++++++++++ replication/parser.go | 14 ++++++++++++++ replication/row_event.go | 38 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index a4ae8b583..7df181226 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -253,11 +253,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { } var action string switch e.Header.EventType { - case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: action = InsertAction - case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: action = DeleteAction - case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: action = UpdateAction default: return errors.Errorf("%s not supported now", e.Header.EventType) diff --git a/replication/const.go b/replication/const.go index fc31763ba..37cbc1da1 100644 --- a/replication/const.go +++ b/replication/const.go @@ -101,6 +101,11 @@ const ( MARIADB_BINLOG_CHECKPOINT_EVENT MARIADB_GTID_EVENT MARIADB_GTID_LIST_EVENT + MARIADB_START_ENCRYPTION_EVENT + MARIADB_QUERY_COMPRESSED_EVENT + MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1 + MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1 + MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1 ) func (e EventType) String() string { @@ -197,6 +202,12 @@ func (e EventType) String() string { return "TransactionPayloadEvent" case HEARTBEAT_LOG_EVENT_V2: return "HeartbeatLogEventV2" + case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + return "MariadbWriteRowsCompressedEventV1" + case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: + return "MariadbUpdateRowsCompressedEventV1" + case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + return "MariadbDeleteRowsCompressedEventV1" default: return "UnknownEvent" diff --git a/replication/parser.go b/replication/parser.go index 37b9c5a6c..6a728b35f 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -270,7 +270,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( WRITE_ROWS_EVENTv2, UPDATE_ROWS_EVENTv2, DELETE_ROWS_EVENTv2, + MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, + MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, + MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options + e = p.newRowsEvent(h) case ROWS_QUERY_EVENT: e = &RowsQueryEvent{} @@ -412,6 +416,16 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { case UPDATE_ROWS_EVENTv1: e.Version = 1 e.needBitmap2 = true + case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + e.Version = 1 + e.compressed = true + case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + e.Version = 1 + e.compressed = true + case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: + e.Version = 1 + e.compressed = true + e.needBitmap2 = true case WRITE_ROWS_EVENTv2: e.Version = 2 case UPDATE_ROWS_EVENTv2: diff --git a/replication/row_event.go b/replication/row_event.go index 6e68648eb..4bda2488d 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1,6 +1,8 @@ package replication import ( + "bytes" + "compress/zlib" "encoding/binary" "encoding/hex" "fmt" @@ -830,6 +832,9 @@ type RowsEvent struct { tables map[uint64]*TableMapEvent needBitmap2 bool + // for mariadb *_COMPRESSED_EVENT_V1 + compressed bool + eventType EventType Table *TableMapEvent @@ -970,9 +975,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { var rowImageType EnumRowImageType switch e.eventType { - case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2: + case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: rowImageType = EnumRowImageTypeWriteAI - case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2: + case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: rowImageType = EnumRowImageTypeDeleteBI default: rowImageType = EnumRowImageTypeUpdateBI @@ -1002,6 +1007,13 @@ func (e *RowsEvent) Decode(data []byte) error { if err != nil { return err } + if e.compressed { + uncompressedData, err := e.decompressData(pos, data) + if err != nil { + return err + } + return e.DecodeData(0, uncompressedData) + } return e.DecodeData(pos, data) } @@ -1103,6 +1115,28 @@ func (e *RowsEvent) parseFracTime(t interface{}) interface{} { return v.Time } +func (e *RowsEvent) decompressData(pos int, data []byte) ([]byte, error) { + // algorithm always 0=zlib + // algorithm := (data[pos] & 0x07) >> 4 + headerSize := int(data[pos] & 0x07) + pos++ + + uncompressedDataSize := BFixedLengthInt(data[pos : pos+headerSize]) + + pos += headerSize + uncompressedData := make([]byte, uncompressedDataSize) + r, err := zlib.NewReader(bytes.NewReader(data[pos:])) + if err != nil { + return nil, err + } + defer r.Close() + _, err = io.ReadFull(r, uncompressedData) + if err != nil { + return nil, err + } + return uncompressedData, nil +} + // see mysql sql/log_event.cc log_event_print_value func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial bool) (v interface{}, n int, err error) { var length = 0 From 1bfe8cdab4cd82dc548be46d27d19495568fe465 Mon Sep 17 00:00:00 2001 From: Aleksejs Sinicins Date: Fri, 5 May 2023 21:05:53 +0300 Subject: [PATCH 2/2] Add support for MARIADB_QUERY_COMPRESSED_EVENT --- mysql/util.go | 20 ++++++++++++++++++++ replication/const.go | 4 ++++ replication/event.go | 13 ++++++++++++- replication/parser.go | 4 ++++ replication/row_event.go | 26 +------------------------- 5 files changed, 41 insertions(+), 26 deletions(-) diff --git a/mysql/util.go b/mysql/util.go index 6d8ec4471..5abe540bc 100644 --- a/mysql/util.go +++ b/mysql/util.go @@ -1,6 +1,8 @@ package mysql import ( + "bytes" + "compress/zlib" "crypto/rand" "crypto/rsa" "crypto/sha1" @@ -92,6 +94,24 @@ func EncryptPassword(password string, seed []byte, pub *rsa.PublicKey) ([]byte, return rsa.EncryptOAEP(sha1v, rand.Reader, pub, plain, nil) } +func DecompressMariadbData(data []byte) ([]byte, error) { + // algorithm always 0=zlib + // algorithm := (data[pos] & 0x07) >> 4 + headerSize := int(data[0] & 0x07) + uncompressedDataSize := BFixedLengthInt(data[1 : 1+headerSize]) + uncompressedData := make([]byte, uncompressedDataSize) + r, err := zlib.NewReader(bytes.NewReader(data[1+headerSize:])) + if err != nil { + return nil, err + } + defer r.Close() + _, err = io.ReadFull(r, uncompressedData) + if err != nil { + return nil, err + } + return uncompressedData, nil +} + // AppendLengthEncodedInteger: encodes a uint64 value and appends it to the given bytes slice func AppendLengthEncodedInteger(b []byte, n uint64) []byte { switch { diff --git a/replication/const.go b/replication/const.go index 37cbc1da1..2ea6fb185 100644 --- a/replication/const.go +++ b/replication/const.go @@ -202,6 +202,10 @@ func (e EventType) String() string { return "TransactionPayloadEvent" case HEARTBEAT_LOG_EVENT_V2: return "HeartbeatLogEventV2" + case MARIADB_START_ENCRYPTION_EVENT: + return "MariadbStartEncryptionEvent" + case MARIADB_QUERY_COMPRESSED_EVENT: + return "MariadbQueryCompressedEvent" case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: return "MariadbWriteRowsCompressedEventV1" case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: diff --git a/replication/event.go b/replication/event.go index 5a1f5c654..264b230ca 100644 --- a/replication/event.go +++ b/replication/event.go @@ -297,6 +297,9 @@ type QueryEvent struct { Schema []byte Query []byte + // for mariadb QUERY_COMPRESSED_EVENT + compressed bool + // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use GSet GTIDSet } @@ -328,7 +331,15 @@ func (e *QueryEvent) Decode(data []byte) error { //skip 0x00 pos++ - e.Query = data[pos:] + if e.compressed { + decompressedQuery, err := DecompressMariadbData(data[pos:]) + if err != nil { + return err + } + e.Query = decompressedQuery + } else { + e.Query = data[pos:] + } return nil } diff --git a/replication/parser.go b/replication/parser.go index 6a728b35f..00d37076b 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -249,6 +249,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( switch h.EventType { case QUERY_EVENT: e = &QueryEvent{} + case MARIADB_QUERY_COMPRESSED_EVENT: + e = &QueryEvent{ + compressed: true, + } case XID_EVENT: e = &XIDEvent{} case TABLE_MAP_EVENT: diff --git a/replication/row_event.go b/replication/row_event.go index 4bda2488d..a7fad168b 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1,8 +1,6 @@ package replication import ( - "bytes" - "compress/zlib" "encoding/binary" "encoding/hex" "fmt" @@ -1008,7 +1006,7 @@ func (e *RowsEvent) Decode(data []byte) error { return err } if e.compressed { - uncompressedData, err := e.decompressData(pos, data) + uncompressedData, err := DecompressMariadbData(data[pos:]) if err != nil { return err } @@ -1115,28 +1113,6 @@ func (e *RowsEvent) parseFracTime(t interface{}) interface{} { return v.Time } -func (e *RowsEvent) decompressData(pos int, data []byte) ([]byte, error) { - // algorithm always 0=zlib - // algorithm := (data[pos] & 0x07) >> 4 - headerSize := int(data[pos] & 0x07) - pos++ - - uncompressedDataSize := BFixedLengthInt(data[pos : pos+headerSize]) - - pos += headerSize - uncompressedData := make([]byte, uncompressedDataSize) - r, err := zlib.NewReader(bytes.NewReader(data[pos:])) - if err != nil { - return nil, err - } - defer r.Close() - _, err = io.ReadFull(r, uncompressedData) - if err != nil { - return nil, err - } - return uncompressedData, nil -} - // see mysql sql/log_event.cc log_event_print_value func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial bool) (v interface{}, n int, err error) { var length = 0