Skip to content

Commit 759b960

Browse files
authored
Merge branch 'go-mysql-org:master' into master
2 parents 5f8ab29 + 313a953 commit 759b960

30 files changed

+883
-151
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### Tag v1.3.0 (2021.06.10)
2+
* Init Resultset in Result when handling ddl statement. [#578](https://github.com/go-mysql-org/go-mysql/pull/578) ([romberli](https://github.com/romberli))
3+
* Add pool for client connections. [#584](https://github.com/go-mysql-org/go-mysql/pull/584) ([atercattus](https://github.com/atercattus))
4+
15
### Tag v1.2.1 (2021.05.27)
26
* Prevent panic on malformed auth data. [#557](https://github.com/go-mysql-org/go-mysql/pull/557) ([timvaillancourt](https://github.com/timvaillancourt))
37

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ A pure go library to handle MySQL network protocol and replication.
99
## How to migrate to this repo
1010
To change the used package in your repo it's enough to add this `replace` directive to your `go.mod`:
1111
```
12-
replace github.com/siddontang/go-mysql => github.com/go-mysql-org/go-mysql v1.2.1
12+
replace github.com/siddontang/go-mysql => github.com/go-mysql-org/go-mysql v1.3.0
1313
```
1414

15-
v1.2.1 - is the last tag in repo, feel free to choose what you want.
15+
v1.3.0 - is the last tag in repo, feel free to choose what you want.
1616

1717
## Changelog
1818
This repo uses [Changelog](CHANGELOG.md).

canal/canal.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
386386
// need to check MySQL binlog row image? full, minimal or noblob?
387387
// now only log
388388
if c.cfg.Flavor == mysql.MySQLFlavor {
389-
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
389+
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'`); err != nil {
390390
return errors.Trace(err)
391391
} else {
392392
// MySQL has binlog row image from 5.6, so older will return empty
@@ -401,7 +401,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
401401
}
402402

403403
func (c *Canal) checkBinlogRowFormat() error {
404-
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
404+
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_format';`)
405405
if err != nil {
406406
return errors.Trace(err)
407407
} else if f, _ := res.GetString(0, 1); f != "ROW" {

client/auth.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,18 @@ func (c *Conn) writeAuthHandshake() error {
140140
if !authPluginAllowed(c.authPluginName) {
141141
return fmt.Errorf("unknow auth plugin name '%s'", c.authPluginName)
142142
}
143-
// Adjust client capability flags based on server support
143+
144+
// Set default client capabilities that reflect the abilities of this library
144145
capability := CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION |
145-
CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH | c.capability&CLIENT_LONG_FLAG
146+
CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH
147+
// Adjust client capability flags based on server support
148+
capability |= c.capability & CLIENT_LONG_FLAG
149+
// Adjust client capability flags on specific client requests
150+
// Only flags that would make any sense setting and aren't handled elsewhere
151+
// in the library are supported here
152+
capability |= c.ccaps&CLIENT_FOUND_ROWS | c.ccaps&CLIENT_IGNORE_SPACE |
153+
c.ccaps&CLIENT_MULTI_STATEMENTS | c.ccaps&CLIENT_MULTI_RESULTS |
154+
c.ccaps&CLIENT_PS_MULTI_RESULTS | c.ccaps&CLIENT_CONNECT_ATTRS
146155

147156
// To enable TLS / SSL
148157
if c.tlsConfig != nil {

client/client_test.go

+35
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,41 @@ func (s *clientTestSuite) TestConn_Ping(c *C) {
9494
c.Assert(err, IsNil)
9595
}
9696

97+
func (s *clientTestSuite) TestConn_SetCapability(c *C) {
98+
caps := []uint32{
99+
mysql.CLIENT_LONG_PASSWORD,
100+
mysql.CLIENT_FOUND_ROWS,
101+
mysql.CLIENT_LONG_FLAG,
102+
mysql.CLIENT_CONNECT_WITH_DB,
103+
mysql.CLIENT_NO_SCHEMA,
104+
mysql.CLIENT_COMPRESS,
105+
mysql.CLIENT_ODBC,
106+
mysql.CLIENT_LOCAL_FILES,
107+
mysql.CLIENT_IGNORE_SPACE,
108+
mysql.CLIENT_PROTOCOL_41,
109+
mysql.CLIENT_INTERACTIVE,
110+
mysql.CLIENT_SSL,
111+
mysql.CLIENT_IGNORE_SIGPIPE,
112+
mysql.CLIENT_TRANSACTIONS,
113+
mysql.CLIENT_RESERVED,
114+
mysql.CLIENT_SECURE_CONNECTION,
115+
mysql.CLIENT_MULTI_STATEMENTS,
116+
mysql.CLIENT_MULTI_RESULTS,
117+
mysql.CLIENT_PS_MULTI_RESULTS,
118+
mysql.CLIENT_PLUGIN_AUTH,
119+
mysql.CLIENT_CONNECT_ATTRS,
120+
mysql.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA,
121+
}
122+
123+
for _, cap := range caps {
124+
c.Assert(s.c.ccaps&cap > 0, IsFalse)
125+
s.c.SetCapability(cap)
126+
c.Assert(s.c.ccaps&cap > 0, IsTrue)
127+
s.c.UnsetCapability(cap)
128+
c.Assert(s.c.ccaps&cap > 0, IsFalse)
129+
}
130+
}
131+
97132
// NOTE for MySQL 5.5 and 5.6, server side has to config SSL to pass the TLS test, otherwise, it will throw error that
98133
// MySQL server does not support TLS required by the client. However, for MySQL 5.7 and above, auto generated certificates
99134
// are used by default so that manual config is no longer necessary.

client/conn.go

+13
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ type Conn struct {
2121
tlsConfig *tls.Config
2222
proto string
2323

24+
// server capabilities
2425
capability uint32
26+
// client-set capabilities only
27+
ccaps uint32
2528

2629
status uint16
2730

@@ -120,6 +123,16 @@ func (c *Conn) Ping() error {
120123
return nil
121124
}
122125

126+
// SetCapability enables the use of a specific capability
127+
func (c *Conn) SetCapability(cap uint32) {
128+
c.ccaps |= cap
129+
}
130+
131+
// UnsetCapability disables the use of a specific capability
132+
func (c *Conn) UnsetCapability(cap uint32) {
133+
c.ccaps &= ^cap
134+
}
135+
123136
// UseSSL: use default SSL
124137
// pass to options when connect
125138
func (c *Conn) UseSSL(insecureSkipVerify bool) {

client/pool.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -466,11 +466,12 @@ func (pool *Pool) startNewConnections(count int) {
466466

467467
func (pool *Pool) ping(conn *Conn) error {
468468
deadline := time.Now().Add(100 * time.Millisecond)
469-
_ = conn.SetWriteDeadline(deadline)
470-
_ = conn.SetReadDeadline(deadline)
469+
_ = conn.SetDeadline(deadline)
471470
err := conn.Ping()
472471
if err != nil {
473472
pool.logFunc(`Pool: ping query fail: %s`, err.Error())
473+
} else {
474+
_ = conn.SetDeadline(time.Time{})
474475
}
475476
return err
476477
}

client/resp.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) {
5252
pos += 2
5353

5454
//todo:strict_mode, check warnings as error
55-
//Warnings := binary.LittleEndian.Uint16(data[pos:])
56-
//pos += 2
55+
r.Warnings = binary.LittleEndian.Uint16(data[pos:])
56+
pos += 2
5757
} else if c.capability&CLIENT_TRANSACTIONS > 0 {
5858
r.Status = binary.LittleEndian.Uint16(data[pos:])
5959
c.status = r.Status
@@ -254,6 +254,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP
254254
result.Status = okResult.Status
255255
result.AffectedRows = okResult.AffectedRows
256256
result.InsertId = okResult.InsertId
257+
result.Warnings = okResult.Warnings
257258
if result.Resultset == nil {
258259
result.Resultset = NewResultset(0)
259260
} else {
@@ -332,7 +333,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
332333
// EOF Packet
333334
if c.isEOFPacket(data) {
334335
if c.capability&CLIENT_PROTOCOL_41 > 0 {
335-
//result.Warnings = binary.LittleEndian.Uint16(data[1:])
336+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
336337
//todo add strict_mode, warning will be treat as error
337338
result.Status = binary.LittleEndian.Uint16(data[3:])
338339
c.status = result.Status
@@ -373,7 +374,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
373374
// EOF Packet
374375
if c.isEOFPacket(data) {
375376
if c.capability&CLIENT_PROTOCOL_41 > 0 {
376-
//result.Warnings = binary.LittleEndian.Uint16(data[1:])
377+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
377378
//todo add strict_mode, warning will be treat as error
378379
result.Status = binary.LittleEndian.Uint16(data[3:])
379380
c.status = result.Status
@@ -421,7 +422,7 @@ func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb S
421422
// EOF Packet
422423
if c.isEOFPacket(data) {
423424
if c.capability&CLIENT_PROTOCOL_41 > 0 {
424-
// result.Warnings = binary.LittleEndian.Uint16(data[1:])
425+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
425426
// todo add strict_mode, warning will be treat as error
426427
result.Status = binary.LittleEndian.Uint16(data[3:])
427428
c.status = result.Status

client/stmt.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ type Stmt struct {
1313
conn *Conn
1414
id uint32
1515

16-
params int
17-
columns int
16+
params int
17+
columns int
18+
warnings int
1819
}
1920

2021
func (s *Stmt) ParamNum() int {
@@ -25,6 +26,10 @@ func (s *Stmt) ColumnNum() int {
2526
return s.columns
2627
}
2728

29+
func (s *Stmt) WarningsNum() int {
30+
return s.warnings
31+
}
32+
2833
func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
2934
if err := s.write(args...); err != nil {
3035
return nil, errors.Trace(err)
@@ -196,7 +201,8 @@ func (c *Conn) Prepare(query string) (*Stmt, error) {
196201
pos += 2
197202

198203
//warnings
199-
//warnings = binary.LittleEndian.Uint16(data[pos:])
204+
s.warnings = int(binary.LittleEndian.Uint16(data[pos:]))
205+
pos += 2
200206

201207
if s.params > 0 {
202208
if err := s.conn.readUntilEOF(); err != nil {

cmd/go-mysqlbinlog/main.go

+24-5
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99
"fmt"
1010
"os"
1111

12+
"github.com/pingcap/errors"
13+
1214
"github.com/go-mysql-org/go-mysql/mysql"
1315
"github.com/go-mysql-org/go-mysql/replication"
14-
"github.com/pingcap/errors"
1516
)
1617

1718
var host = flag.String("host", "127.0.0.1", "MySQL host")
@@ -23,6 +24,7 @@ var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
2324

2425
var file = flag.String("file", "", "Binlog filename")
2526
var pos = flag.Int("pos", 4, "Binlog position")
27+
var gtid = flag.String("gtid", "", "Binlog GTID set that this slave has executed")
2628

2729
var semiSync = flag.Bool("semisync", false, "Support semi sync")
2830
var backupPath = flag.String("backup_path", "", "backup path to store binlog files")
@@ -56,10 +58,27 @@ func main() {
5658
return
5759
}
5860
} else {
59-
s, err := b.StartSync(pos)
60-
if err != nil {
61-
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
62-
return
61+
var (
62+
s *replication.BinlogStreamer
63+
err error
64+
)
65+
if len(*gtid) > 0 {
66+
gset, err := mysql.ParseGTIDSet(*flavor, *gtid)
67+
if err != nil {
68+
fmt.Printf("Failed to parse gtid %s with flavor %s, error: %v\n",
69+
*gtid, *flavor, errors.ErrorStack(err))
70+
}
71+
s, err = b.StartSyncGTID(gset)
72+
if err != nil {
73+
fmt.Printf("Start sync by GTID error: %v\n", errors.ErrorStack(err))
74+
return
75+
}
76+
} else {
77+
s, err = b.StartSync(pos)
78+
if err != nil {
79+
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
80+
return
81+
}
6382
}
6483

6584
for {

0 commit comments

Comments
 (0)