Skip to content

Add support for MariaDB compressed binlog events #786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
}
var action string
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
action = UpdateAction
default:
return errors.Errorf("%s not supported now", e.Header.EventType)
Expand Down
20 changes: 20 additions & 0 deletions mysql/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mysql

import (
"bytes"
"compress/zlib"
"crypto/rand"
"crypto/rsa"
"crypto/sha1"
Expand Down Expand Up @@ -92,6 +94,24 @@ func EncryptPassword(password string, seed []byte, pub *rsa.PublicKey) ([]byte,
return rsa.EncryptOAEP(sha1v, rand.Reader, pub, plain, nil)
}

func DecompressMariadbData(data []byte) ([]byte, error) {
// algorithm always 0=zlib
// algorithm := (data[pos] & 0x07) >> 4
headerSize := int(data[0] & 0x07)
uncompressedDataSize := BFixedLengthInt(data[1 : 1+headerSize])
uncompressedData := make([]byte, uncompressedDataSize)
r, err := zlib.NewReader(bytes.NewReader(data[1+headerSize:]))
if err != nil {
return nil, err
}
defer r.Close()
_, err = io.ReadFull(r, uncompressedData)
if err != nil {
return nil, err
}
return uncompressedData, nil
}

// AppendLengthEncodedInteger: encodes a uint64 value and appends it to the given bytes slice
func AppendLengthEncodedInteger(b []byte, n uint64) []byte {
switch {
Expand Down
15 changes: 15 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ const (
MARIADB_BINLOG_CHECKPOINT_EVENT
MARIADB_GTID_EVENT
MARIADB_GTID_LIST_EVENT
MARIADB_START_ENCRYPTION_EVENT
MARIADB_QUERY_COMPRESSED_EVENT
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1
)

func (e EventType) String() string {
Expand Down Expand Up @@ -197,6 +202,16 @@ func (e EventType) String() string {
return "TransactionPayloadEvent"
case HEARTBEAT_LOG_EVENT_V2:
return "HeartbeatLogEventV2"
case MARIADB_START_ENCRYPTION_EVENT:
return "MariadbStartEncryptionEvent"
case MARIADB_QUERY_COMPRESSED_EVENT:
return "MariadbQueryCompressedEvent"
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbWriteRowsCompressedEventV1"
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbUpdateRowsCompressedEventV1"
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbDeleteRowsCompressedEventV1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also support MARIADB_START_ENCRYPTION_EVENT and MARIADB_QUERY_COMPRESSED_EVENT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added MARIADB_QUERY_COMPRESSED_EVENT.
Moved the uncompression functions to mysql package as it is shared.
Screenshot 2023-05-05 at 20 56 58
Screenshot 2023-05-05 at 20 59 41

As for MARIADB_START_ENCRYPTION_EVENT - I added this event only to fill enum values. It is tricky to support it. I added the text description, but since everything (including next event pos) is encrypted this library crashes when it is enabled.
Screenshot 2023-05-05 at 21 21 57


default:
return "UnknownEvent"
Expand Down
13 changes: 12 additions & 1 deletion replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ type QueryEvent struct {
Schema []byte
Query []byte

// for mariadb QUERY_COMPRESSED_EVENT
compressed bool

// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
GSet GTIDSet
}
Expand Down Expand Up @@ -328,7 +331,15 @@ func (e *QueryEvent) Decode(data []byte) error {
//skip 0x00
pos++

e.Query = data[pos:]
if e.compressed {
decompressedQuery, err := DecompressMariadbData(data[pos:])
if err != nil {
return err
}
e.Query = decompressedQuery
} else {
e.Query = data[pos:]
}
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
switch h.EventType {
case QUERY_EVENT:
e = &QueryEvent{}
case MARIADB_QUERY_COMPRESSED_EVENT:
e = &QueryEvent{
compressed: true,
}
case XID_EVENT:
e = &XIDEvent{}
case TABLE_MAP_EVENT:
Expand All @@ -270,7 +274,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
WRITE_ROWS_EVENTv2,
UPDATE_ROWS_EVENTv2,
DELETE_ROWS_EVENTv2,
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1,
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1,
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1,
PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options

e = p.newRowsEvent(h)
case ROWS_QUERY_EVENT:
e = &RowsQueryEvent{}
Expand Down Expand Up @@ -412,6 +420,16 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
case UPDATE_ROWS_EVENTv1:
e.Version = 1
e.needBitmap2 = true
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
e.needBitmap2 = true
case WRITE_ROWS_EVENTv2:
e.Version = 2
case UPDATE_ROWS_EVENTv2:
Expand Down
14 changes: 12 additions & 2 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,9 @@ type RowsEvent struct {
tables map[uint64]*TableMapEvent
needBitmap2 bool

// for mariadb *_COMPRESSED_EVENT_V1
compressed bool

eventType EventType

Table *TableMapEvent
Expand Down Expand Up @@ -970,9 +973,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {

var rowImageType EnumRowImageType
switch e.eventType {
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2:
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
rowImageType = EnumRowImageTypeWriteAI
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2:
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
rowImageType = EnumRowImageTypeDeleteBI
default:
rowImageType = EnumRowImageTypeUpdateBI
Expand Down Expand Up @@ -1002,6 +1005,13 @@ func (e *RowsEvent) Decode(data []byte) error {
if err != nil {
return err
}
if e.compressed {
uncompressedData, err := DecompressMariadbData(data[pos:])
if err != nil {
return err
}
return e.DecodeData(0, uncompressedData)
}
return e.DecodeData(pos, data)
}

Expand Down