Skip to content

Decoding of compressed binlog events #773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 25, 2023
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/BurntSushi/toml v0.3.1
github.com/DataDog/zstd v1.5.2
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.3.0
github.com/jmoiron/sqlx v1.3.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
Expand Down
9 changes: 9 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ const (
TRANSACTION_CONTEXT_EVENT
VIEW_CHANGE_EVENT
XA_PREPARE_LOG_EVENT
PARTIAL_UPDATE_ROWS_EVENT
TRANSACTION_PAYLOAD_EVENT
HEARTBEAT_LOG_EVENT_V2
)

const (
Expand Down Expand Up @@ -188,6 +191,12 @@ func (e EventType) String() string {
return "ViewChangeEvent"
case XA_PREPARE_LOG_EVENT:
return "XAPrepareLogEvent"
case PARTIAL_UPDATE_ROWS_EVENT:
return "PartialUpdateRowsEvent"
case TRANSACTION_PAYLOAD_EVENT:
return "TransactionPayloadEvent"
case HEARTBEAT_LOG_EVENT_V2:
return "HeartbeatLogEventV2"

default:
return "UnknownEvent"
Expand Down
9 changes: 9 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
e = &PreviousGTIDsEvent{}
case INTVAR_EVENT:
e = &IntVarEvent{}
case TRANSACTION_PAYLOAD_EVENT:
e = p.newTransactionPayloadEvent()
default:
e = &GenericEvent{}
}
Expand Down Expand Up @@ -417,3 +419,10 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {

return e
}

func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent {
e := &TransactionPayloadEvent{}
e.format = *p.format

return e
}
149 changes: 149 additions & 0 deletions replication/transaction_payload_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package replication

import (
"encoding/binary"
"encoding/hex"
"fmt"
"io"

"github.com/DataDog/zstd"

. "github.com/go-mysql-org/go-mysql/mysql"
)

// On The Wire: Field Types
// See also binary_log::codecs::binary::Transaction_payload::fields in MySQL
// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b
const (
OTW_PAYLOAD_HEADER_END_MARK = iota
OTW_PAYLOAD_SIZE_FIELD
OTW_PAYLOAD_COMPRESSION_TYPE_FIELD
OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD
)

// Compression Types
const (
ZSTD = 0
NONE = 255
)

type TransactionPayloadEvent struct {
format FormatDescriptionEvent
Size uint64
UncompressedSize uint64
CompressionType uint64
Payload []byte
Events []*BinlogEvent
}

func (e *TransactionPayloadEvent) compressionType() string {
switch e.CompressionType {
case ZSTD:
return "ZSTD"
case NONE:
return "NONE"
default:
return "Unknown"
}
}

func (e *TransactionPayloadEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "Payload Size: %d\n", e.Size)
fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize)
fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType())
fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload))
fmt.Fprintln(w, "=== Start of events decoded from compressed payload ===")
for _, event := range e.Events {
event.Dump(w)
}
fmt.Fprintln(w, "=== End of events decoded from compressed payload ===")
fmt.Fprintln(w)
}

func (e *TransactionPayloadEvent) Decode(data []byte) error {
err := e.decodeFields(data)
if err != nil {
return err
}
return e.decodePayload()
}

func (e *TransactionPayloadEvent) decodeFields(data []byte) error {
offset := uint64(0)

for {
fieldType := FixedLengthInt(data[offset : offset+1])
offset++

if fieldType == OTW_PAYLOAD_HEADER_END_MARK {
e.Payload = data[offset:]
break
} else {
fieldLength := FixedLengthInt(data[offset : offset+1])
offset++

switch fieldType {
case OTW_PAYLOAD_SIZE_FIELD:
e.Size = FixedLengthInt(data[offset : offset+fieldLength])
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
e.CompressionType = FixedLengthInt(data[offset : offset+fieldLength])
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
e.UncompressedSize = FixedLengthInt(data[offset : offset+fieldLength])
}

offset += fieldLength
}
}

return nil
}

func (e *TransactionPayloadEvent) decodePayload() error {
if e.CompressionType != ZSTD {
return fmt.Errorf("TransactionPayloadEvent has compression type %d (%s)",
e.CompressionType, e.compressionType())
}

payloadUncompressed, err := zstd.Decompress(nil, e.Payload)
if err != nil {
return err
}

// The uncompressed data needs to be split up into individual events for Parse()
// to work on them. We can't use e.parser directly as we need to disable checksums
// but we still need the initialization from the FormatDescriptionEvent. We can't
// modify e.parser as it is used elsewhere.
parser := NewBinlogParser()
parser.format = &FormatDescriptionEvent{
Version: e.format.Version,
ServerVersion: e.format.ServerVersion,
CreateTimestamp: e.format.CreateTimestamp,
EventHeaderLength: e.format.EventHeaderLength,
EventTypeHeaderLengths: e.format.EventTypeHeaderLengths,
ChecksumAlgorithm: BINLOG_CHECKSUM_ALG_OFF,
}

offset := uint32(0)
for {
payloadUncompressedLength := uint32(len(payloadUncompressed))
if offset+13 > payloadUncompressedLength {
break
}
eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13])
if offset+eventLength > payloadUncompressedLength {
return fmt.Errorf("Event length of %d with offset %d in uncompressed payload exceeds payload length of %d",
eventLength, offset, payloadUncompressedLength)
}
data := payloadUncompressed[offset : offset+eventLength]

pe, err := parser.Parse(data)
if err != nil {
return err
}
e.Events = append(e.Events, pe)

offset += eventLength
}

return nil
}