Skip to content

Commit cdd955d

Browse files
committed
Support for PARTIAL_UPDATE_ROWS_EVENT binlog event and PARTIAL_JSON mode
1 parent b096301 commit cdd955d

12 files changed

+407
-81
lines changed

client/conn.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99
"strings"
1010
"time"
1111

12+
"github.com/pingcap/errors"
13+
1214
. "github.com/go-mysql-org/go-mysql/mysql"
1315
"github.com/go-mysql-org/go-mysql/packet"
1416
"github.com/go-mysql-org/go-mysql/utils"
15-
"github.com/pingcap/errors"
1617
)
1718

1819
type Conn struct {
@@ -198,6 +199,10 @@ func (c *Conn) GetServerVersion() string {
198199
return c.serverVersion
199200
}
200201

202+
func (c *Conn) CompareServerVersion(v string) (int, error) {
203+
return CompareServerVersions(c.serverVersion, v)
204+
}
205+
201206
func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
202207
if len(args) == 0 {
203208
return c.exec(command)

client/resp.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto/x509"
77
"encoding/binary"
88
"encoding/pem"
9+
"fmt"
910

1011
"github.com/pingcap/errors"
1112
"github.com/siddontang/go/hack"
@@ -60,9 +61,9 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) {
6061
// pos += 2
6162
}
6263

63-
//new ok package will check CLIENT_SESSION_TRACK too, but I don't support it now.
64+
// new ok package will check CLIENT_SESSION_TRACK too, but I don't support it now.
6465

65-
//skip info
66+
// skip info
6667
return r, nil
6768
}
6869

@@ -75,7 +76,7 @@ func (c *Conn) handleErrorPacket(data []byte) error {
7576
pos += 2
7677

7778
if c.capability&CLIENT_PROTOCOL_41 > 0 {
78-
//skip '#'
79+
// skip '#'
7980
pos++
8081
e.State = hack.String(data[pos : pos+5])
8182
pos += 5
@@ -89,11 +90,11 @@ func (c *Conn) handleErrorPacket(data []byte) error {
8990
func (c *Conn) handleAuthResult() error {
9091
data, switchToPlugin, err := c.readAuthResult()
9192
if err != nil {
92-
return err
93+
return fmt.Errorf("readAuthResult: %w", err)
9394
}
9495
// handle auth switch, only support 'sha256_password', and 'caching_sha2_password'
9596
if switchToPlugin != "" {
96-
//fmt.Printf("now switching auth plugin to '%s'\n", switchToPlugin)
97+
// fmt.Printf("now switching auth plugin to '%s'\n", switchToPlugin)
9798
if data == nil {
9899
data = c.salt
99100
} else {
@@ -168,7 +169,7 @@ func (c *Conn) handleAuthResult() error {
168169
func (c *Conn) readAuthResult() ([]byte, string, error) {
169170
data, err := c.ReadPacket()
170171
if err != nil {
171-
return nil, "", err
172+
return nil, "", fmt.Errorf("ReadPacket: %w", err)
172173
}
173174

174175
// see: https://insidemysql.com/preparing-your-community-connector-for-mysql-8-part-2-sha256/
@@ -351,7 +352,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
351352
if c.isEOFPacket(data) {
352353
if c.capability&CLIENT_PROTOCOL_41 > 0 {
353354
result.Warnings = binary.LittleEndian.Uint16(data[1:])
354-
//todo add strict_mode, warning will be treat as error
355+
// todo add strict_mode, warning will be treat as error
355356
result.Status = binary.LittleEndian.Uint16(data[3:])
356357
c.status = result.Status
357358
}
@@ -392,7 +393,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
392393
if c.isEOFPacket(data) {
393394
if c.capability&CLIENT_PROTOCOL_41 > 0 {
394395
result.Warnings = binary.LittleEndian.Uint16(data[1:])
395-
//todo add strict_mode, warning will be treat as error
396+
// todo add strict_mode, warning will be treat as error
396397
result.Status = binary.LittleEndian.Uint16(data[3:])
397398
c.status = result.Status
398399
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.16
55
require (
66
github.com/BurntSushi/toml v0.3.1
77
github.com/DataDog/zstd v1.5.2
8+
github.com/Masterminds/semver v1.5.0
89
github.com/go-sql-driver/mysql v1.6.0
910
github.com/google/uuid v1.3.0
1011
github.com/jmoiron/sqlx v1.3.3
@@ -16,6 +17,5 @@ require (
1617
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
1718
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
1819
github.com/stretchr/testify v1.8.0
19-
golang.org/x/mod v0.3.0
2020
golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b // indirect
2121
)

go.sum

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
22
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
33
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
44
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
5+
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
6+
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
57
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
68
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
79
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
@@ -75,7 +77,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
7577
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
7678
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
7779
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
78-
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
7980
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
8081
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
8182
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=

mysql/util.go

+18
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"strings"
1414
"time"
1515

16+
"github.com/Masterminds/semver"
1617
"github.com/pingcap/errors"
1718
"github.com/siddontang/go/hack"
1819
)
@@ -379,6 +380,23 @@ func ErrorEqual(err1, err2 error) bool {
379380
return e1.Error() == e2.Error()
380381
}
381382

383+
func CompareServerVersions(a, b string) (int, error) {
384+
var (
385+
aVer, bVer *semver.Version
386+
err error
387+
)
388+
389+
if aVer, err = semver.NewVersion(a); err != nil {
390+
return 0, fmt.Errorf("cannot parse %q as semver: %w", a, err)
391+
}
392+
393+
if bVer, err = semver.NewVersion(b); err != nil {
394+
return 0, fmt.Errorf("cannot parse %q as semver: %w", b, err)
395+
}
396+
397+
return aVer.Compare(bVer), nil
398+
}
399+
382400
var encodeRef = map[byte]byte{
383401
'\x00': '0',
384402
'\'': '\'',

mysql/util_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package mysql
2+
3+
import (
4+
"github.com/pingcap/check"
5+
)
6+
7+
type utilTestSuite struct {
8+
}
9+
10+
var _ = check.Suite(&utilTestSuite{})
11+
12+
func (s *utilTestSuite) TestCompareServerVersions(c *check.C) {
13+
tests := []struct {
14+
A string
15+
B string
16+
Expect int
17+
}{
18+
{A: "1.2.3", B: "1.2.3", Expect: 0},
19+
{A: "5.6-999", B: "8.0", Expect: -1},
20+
{A: "8.0.32-0ubuntu0.20.04.2", B: "8.0.28", Expect: 1},
21+
}
22+
23+
for _, test := range tests {
24+
comment := check.Commentf("%q vs. %q", test.A, test.B)
25+
26+
got, err := CompareServerVersions(test.A, test.B)
27+
c.Assert(err, check.IsNil, comment)
28+
c.Assert(got, check.Equals, test.Expect, comment)
29+
}
30+
}

replication/json_binary.go

+96-6
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import (
55
"fmt"
66
"math"
77

8-
. "github.com/go-mysql-org/go-mysql/mysql"
98
"github.com/pingcap/errors"
109
"github.com/siddontang/go/hack"
10+
11+
. "github.com/go-mysql-org/go-mysql/mysql"
1112
)
1213

1314
const (
@@ -44,6 +45,60 @@ const (
4445
jsonbValueEntrySizeLarge = 1 + jsonbLargeOffsetSize
4546
)
4647

48+
var (
49+
ErrCorruptedJSONDiff = fmt.Errorf("corrupted JSON diff") // ER_CORRUPTED_JSON_DIFF
50+
)
51+
52+
type (
53+
// JsonDiffOperation is an enum that describes what kind of operation a JsonDiff object represents.
54+
// https://github.com/mysql/mysql-server/blob/8.0/sql/json_diff.h
55+
JsonDiffOperation byte
56+
)
57+
58+
const (
59+
// The JSON value in the given path is replaced with a new value.
60+
//
61+
// It has the same effect as `JSON_REPLACE(col, path, value)`.
62+
JsonDiffOperationReplace = JsonDiffOperation(iota)
63+
64+
// Add a new element at the given path.
65+
//
66+
// If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`.
67+
//
68+
// If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`.
69+
JsonDiffOperationInsert
70+
71+
// The JSON value at the given path is removed from an array or object.
72+
//
73+
// It has the same effect as `JSON_REMOVE(col, path)`.
74+
JsonDiffOperationRemove
75+
)
76+
77+
type (
78+
JsonDiff struct {
79+
Op JsonDiffOperation
80+
Path string
81+
Value string
82+
}
83+
)
84+
85+
func (op JsonDiffOperation) String() string {
86+
switch op {
87+
case JsonDiffOperationReplace:
88+
return "Replace"
89+
case JsonDiffOperationInsert:
90+
return "Insert"
91+
case JsonDiffOperationRemove:
92+
return "Remove"
93+
default:
94+
return fmt.Sprintf("Unknown(%d)", op)
95+
}
96+
}
97+
98+
func (jd *JsonDiff) String() string {
99+
return fmt.Sprintf("json_diff(op:%s path:%s value:%s)", jd.Op, jd.Path, jd.Value)
100+
}
101+
47102
func jsonbGetOffsetSize(isSmall bool) int {
48103
if isSmall {
49104
return jsonbSmallOffsetSize
@@ -71,11 +126,6 @@ func jsonbGetValueEntrySize(isSmall bool) int {
71126
// decodeJsonBinary decodes the JSON binary encoding data and returns
72127
// the common JSON encoding data.
73128
func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) {
74-
// Sometimes, we can insert a NULL JSON even we set the JSON field as NOT NULL.
75-
// If we meet this case, we can return an empty slice.
76-
if len(data) == 0 {
77-
return []byte{}, nil
78-
}
79129
d := jsonBinaryDecoder{
80130
useDecimal: e.useDecimal,
81131
ignoreDecodeErr: e.ignoreJSONDecodeErr,
@@ -491,3 +541,43 @@ func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) {
491541

492542
return 0, 0
493543
}
544+
545+
func (e *RowsEvent) decodeJsonPartialBinary(data []byte) (*JsonDiff, error) {
546+
// see Json_diff_vector::read_binary() in mysql-server/sql/json_diff.cc
547+
operationNumber := JsonDiffOperation(data[0])
548+
switch operationNumber {
549+
case JsonDiffOperationReplace:
550+
case JsonDiffOperationInsert:
551+
case JsonDiffOperationRemove:
552+
default:
553+
return nil, ErrCorruptedJSONDiff
554+
}
555+
data = data[1:]
556+
557+
pathLength, _, n := LengthEncodedInt(data)
558+
data = data[n:]
559+
560+
path := data[:pathLength]
561+
data = data[pathLength:]
562+
563+
diff := &JsonDiff{
564+
Op: operationNumber,
565+
Path: string(path),
566+
// Value will be filled below
567+
}
568+
569+
if operationNumber == JsonDiffOperationRemove {
570+
return diff, nil
571+
}
572+
573+
valueLength, _, n := LengthEncodedInt(data)
574+
data = data[n:]
575+
576+
d, err := e.decodeJsonBinary(data[:valueLength])
577+
if err != nil {
578+
return nil, fmt.Errorf("cannot read json diff for field %q: %w", path, err)
579+
}
580+
diff.Value = string(d)
581+
582+
return diff, nil
583+
}

replication/parser.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
269269
UPDATE_ROWS_EVENTv1,
270270
WRITE_ROWS_EVENTv2,
271271
UPDATE_ROWS_EVENTv2,
272-
DELETE_ROWS_EVENTv2:
272+
DELETE_ROWS_EVENTv2,
273+
PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options
273274
e = p.newRowsEvent(h)
274275
case ROWS_QUERY_EVENT:
275276
e = &RowsQueryEvent{}
@@ -381,14 +382,17 @@ func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error {
381382

382383
func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
383384
e := &RowsEvent{}
384-
if p.format.EventTypeHeaderLengths[h.EventType-1] == 6 {
385+
386+
postHeaderLen := p.format.EventTypeHeaderLengths[h.EventType-1]
387+
if postHeaderLen == 6 {
385388
e.tableIDSize = 4
386389
} else {
387390
e.tableIDSize = 6
388391
}
389392

390393
e.needBitmap2 = false
391394
e.tables = p.tables
395+
e.eventType = h.EventType
392396
e.parseTime = p.parseTime
393397
e.timestampStringLocation = p.timestampStringLocation
394398
e.useDecimal = p.useDecimal
@@ -415,6 +419,9 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
415419
e.needBitmap2 = true
416420
case DELETE_ROWS_EVENTv2:
417421
e.Version = 2
422+
case PARTIAL_UPDATE_ROWS_EVENT:
423+
e.Version = 2
424+
e.needBitmap2 = true
418425
}
419426

420427
return e

0 commit comments

Comments
 (0)