Skip to content

Commit 3dc80ac

Browse files
authored
Merge pull request #759 from Fizic/master
Adding replication protocol support to mysql server implementation
2 parents 81966e1 + 60d5d2f commit 3dc80ac

File tree

6 files changed

+133
-2
lines changed

6 files changed

+133
-2
lines changed

replication/binlogstreamer.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,32 @@ func (s *BinlogStreamer) closeWithError(err error) {
8484
}
8585
}
8686

87-
func newBinlogStreamer() *BinlogStreamer {
87+
func NewBinlogStreamer() *BinlogStreamer {
8888
s := new(BinlogStreamer)
8989

9090
s.ch = make(chan *BinlogEvent, 10240)
9191
s.ech = make(chan error, 4)
9292

9393
return s
9494
}
95+
96+
// AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually.
97+
// can be used in replication handlers
98+
func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
99+
select {
100+
case s.ch <- ev:
101+
return nil
102+
case err := <-s.ech:
103+
return err
104+
}
105+
}
106+
107+
// AddErrorToStreamer adds an error to the streamer.
108+
func (s *BinlogStreamer) AddErrorToStreamer(err error) bool {
109+
select {
110+
case s.ech <- err:
111+
return true
112+
default:
113+
return false
114+
}
115+
}

replication/binlogsyncer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func (b *BinlogSyncer) prepare() error {
393393
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
394394
b.running = true
395395

396-
s := newBinlogStreamer()
396+
s := NewBinlogStreamer()
397397

398398
b.wg.Add(1)
399399
go b.onStream(s)

server/command.go

+58
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
. "github.com/go-mysql-org/go-mysql/mysql"
8+
"github.com/go-mysql-org/go-mysql/replication"
89
"github.com/siddontang/go/hack"
910
)
1011

@@ -30,6 +31,13 @@ type Handler interface {
3031
HandleOtherCommand(cmd byte, data []byte) error
3132
}
3233

34+
type ReplicationHandler interface {
35+
// handle Replication command
36+
HandleRegisterSlave(data []byte) error
37+
HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error)
38+
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error)
39+
}
40+
3341
func (c *Conn) HandleCommand() error {
3442
if c.Conn == nil {
3543
return fmt.Errorf("connection closed")
@@ -131,6 +139,40 @@ func (c *Conn) dispatch(data []byte) interface{} {
131139
}
132140

133141
return eofResponse{}
142+
case COM_REGISTER_SLAVE:
143+
if h, ok := c.h.(ReplicationHandler); ok {
144+
return h.HandleRegisterSlave(data)
145+
} else {
146+
return c.h.HandleOtherCommand(cmd, data)
147+
}
148+
case COM_BINLOG_DUMP:
149+
if h, ok := c.h.(ReplicationHandler); ok {
150+
pos, err := parseBinlogDump(data)
151+
if err != nil {
152+
return err
153+
}
154+
if s, err := h.HandleBinlogDump(pos); err != nil {
155+
return err
156+
} else {
157+
return s
158+
}
159+
} else {
160+
return c.h.HandleOtherCommand(cmd, data)
161+
}
162+
case COM_BINLOG_DUMP_GTID:
163+
if h, ok := c.h.(ReplicationHandler); ok {
164+
gtidSet, err := parseBinlogDumpGTID(data)
165+
if err != nil {
166+
return err
167+
}
168+
if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil {
169+
return err
170+
} else {
171+
return s
172+
}
173+
} else {
174+
return c.h.HandleOtherCommand(cmd, data)
175+
}
134176
default:
135177
return c.h.HandleOtherCommand(cmd, data)
136178
}
@@ -139,6 +181,10 @@ func (c *Conn) dispatch(data []byte) interface{} {
139181
type EmptyHandler struct {
140182
}
141183

184+
type EmptyReplicationHandler struct {
185+
EmptyHandler
186+
}
187+
142188
func (h EmptyHandler) UseDB(dbName string) error {
143189
return nil
144190
}
@@ -160,6 +206,18 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error {
160206
return nil
161207
}
162208

209+
func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error {
210+
return fmt.Errorf("not supported now")
211+
}
212+
213+
func (h EmptyReplicationHandler) HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) {
214+
return nil, fmt.Errorf("not supported now")
215+
}
216+
217+
func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) {
218+
return nil, fmt.Errorf("not supported now")
219+
}
220+
163221
func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error {
164222
return NewError(
165223
ER_UNKNOWN_ERROR,

server/command_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ package server
22

33
// Ensure EmptyHandler implements Handler interface or cause compile time error
44
var _ Handler = EmptyHandler{}
5+
var _ ReplicationHandler = EmptyReplicationHandler{}

server/replication.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package server
2+
3+
import (
4+
"encoding/binary"
5+
6+
"github.com/go-mysql-org/go-mysql/mysql"
7+
)
8+
9+
func parseBinlogDump(data []byte) (mysql.Position, error) {
10+
if len(data) < 10 {
11+
return mysql.Position{}, mysql.ErrMalformPacket
12+
}
13+
var p mysql.Position
14+
p.Pos = binary.LittleEndian.Uint32(data[0:4])
15+
p.Name = string(data[10:])
16+
17+
return p, nil
18+
}
19+
20+
func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) {
21+
if len(data) < 15 {
22+
return nil, mysql.ErrMalformPacket
23+
}
24+
lenPosName := binary.LittleEndian.Uint32(data[11:15])
25+
if len(data) < 22+int(lenPosName) {
26+
return nil, mysql.ErrMalformPacket
27+
}
28+
29+
return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:])
30+
}

server/resp.go

+21
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package server
22

33
import (
4+
"context"
45
"fmt"
56

67
. "github.com/go-mysql-org/go-mysql/mysql"
8+
"github.com/go-mysql-org/go-mysql/replication"
79
)
810

911
func (c *Conn) writeOK(r *Result) error {
@@ -197,6 +199,23 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error {
197199
return c.WritePacket(data)
198200
}
199201

202+
// see: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication.html
203+
func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error {
204+
for {
205+
ev, err := s.GetEvent(context.Background())
206+
if err != nil {
207+
return err
208+
}
209+
data := make([]byte, 4, 4+len(ev.RawData))
210+
data = append(data, OK_HEADER)
211+
212+
data = append(data, ev.RawData...)
213+
if err := c.WritePacket(data); err != nil {
214+
return err
215+
}
216+
}
217+
}
218+
200219
type noResponse struct{}
201220
type eofResponse struct{}
202221

@@ -220,6 +239,8 @@ func (c *Conn) WriteValue(value interface{}) error {
220239
return c.writeFieldList(v, nil)
221240
case []FieldValue:
222241
return c.writeFieldValues(v)
242+
case *replication.BinlogStreamer:
243+
return c.writeBinlogEvents(v)
223244
case *Stmt:
224245
return c.writePrepare(v)
225246
default:

0 commit comments

Comments
 (0)