Skip to content

Adding replication protocol support to mysql server implementation #759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jan 16, 2023
15 changes: 14 additions & 1 deletion replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,24 @@ func (s *BinlogStreamer) closeWithError(err error) {
}
}

func newBinlogStreamer() *BinlogStreamer {
func NewBinlogStreamer() *BinlogStreamer {
s := new(BinlogStreamer)

s.ch = make(chan *BinlogEvent, 10240)
s.ech = make(chan error, 4)

return s
}

func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
select {
case s.ch <- ev:
return nil
case err := <-s.ech:
return err
}
}

func (s *BinlogStreamer) AddErrorToStreamer(err error) {
s.ech <- err
}
2 changes: 1 addition & 1 deletion replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (b *BinlogSyncer) prepare() error {
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
b.running = true

s := newBinlogStreamer()
s := NewBinlogStreamer()

b.wg.Add(1)
go b.onStream(s)
Expand Down
55 changes: 55 additions & 0 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/siddontang/go/hack"
)

Expand All @@ -30,6 +31,13 @@ type Handler interface {
HandleOtherCommand(cmd byte, data []byte) error
}

type ReplicationHandler interface {
//handle Replication command
HandleRegisterSlave(data []byte) error
HandleBinlogDump(pos *Position, s *replication.BinlogStreamer)
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, s *replication.BinlogStreamer)
Copy link

@ostinru ostinru Jan 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that s *replication.BinlogStreamer shouldn't be here. It is up to users who will implement ReplicationHandler what to do with these events.

PS: feel free to disagree

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't understand, you mean we can expose Conn to user and user directly write binlog event as []byte to Conn?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... I worded it really badly.

I will agree with you that it is not a good idea to expose Conn.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency we can

  • move s *replication.BinlogStreamer to return values
  • add error to return values
  • remove go in Conn.dispatch()... or document that this method will be called form its own coroutine.

}

func (c *Conn) HandleCommand() error {
if c.Conn == nil {
return fmt.Errorf("connection closed")
Expand Down Expand Up @@ -131,6 +139,39 @@ func (c *Conn) dispatch(data []byte) interface{} {
}

return eofResponse{}
case COM_REGISTER_SLAVE:
if h, ok := c.h.(ReplicationHandler); ok {
if err := h.HandleRegisterSlave(data); err != nil {
return err
}
return nil
} else {
return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to fall back to

return c.h.HandleOtherCommand(cmd, data)

}
case COM_BINLOG_DUMP:
if h, ok := c.h.(ReplicationHandler); ok {
pos, _ := parseBinlogDump(data)
s := replication.NewBinlogStreamer()
go h.HandleBinlogDump(pos, s)

return s
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another choice is that this Conn never returns, we directly call writeBinlogEvents in a loop. I'm not sure what should we do if the Conn is returned and MySQL client send another command to this connection

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. mysql-clients can be disconnected when they are not responsive (see net_write_timeout and net_read_timeout). We should check how replication protocol works.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding us, there're two questions

  1. I wrongly thought the connection will be re-used for next command at first, and want to remind author to check what's the behaviour if the connection is both serving a binlog stream and client's request. Now I see writeBinlogEvents won't do that.
  2. mysql client may set heartbeat interval and binlog streamer should append heartbeat event when the stream is idle for the interval. I think it's OK to implement it later because some use case of this feature will not let stream be idle for a long time.

} else {
return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead")
}
case COM_BINLOG_DUMP_GTID:
if h, ok := c.h.(ReplicationHandler); ok {
gtidSet, err := parseBinlogDumpGTID(data)
if err != nil {
return err
}

s := replication.NewBinlogStreamer()
go h.HandleBinlogDumpGTID(gtidSet, s)

return s
} else {
return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead")
}
default:
return c.h.HandleOtherCommand(cmd, data)
}
Expand All @@ -139,6 +180,10 @@ func (c *Conn) dispatch(data []byte) interface{} {
type EmptyHandler struct {
}

type EmptyReplicationHandler struct {
EmptyHandler
}

func (h EmptyHandler) UseDB(dbName string) error {
return nil
}
Expand All @@ -160,6 +205,16 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error {
return nil
}

func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error {
return fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position, r *replication.BinlogStreamer) {
}

func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, r *replication.BinlogStreamer) {
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding r.close() here, just to gracefully release resources.


func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error {
return NewError(
ER_UNKNOWN_ERROR,
Expand Down
21 changes: 21 additions & 0 deletions server/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package server

import (
"encoding/binary"

"github.com/go-mysql-org/go-mysql/mysql"
)

func parseBinlogDump(data []byte) (*mysql.Position, error) {
var p mysql.Position
p.Pos = binary.LittleEndian.Uint32(data[0:4])
p.Name = string(data[10:])

return &p, nil
}

func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to support MariaDB GTID set? you can see ParseMariadbGTID

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MariaDB doesn't implement COM_BINLOG_DUMP_GTID - so, we can assume that this message sent by MySQL.

https://github.com/MariaDB/server/blob/10.9/include/mysql_com.h#L118

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean if there's not too much work to do we can support MariaDB by the way. But as you said MariaDB does not use COM_BINLOG_DUMP_GTID so it may not be a trivial work

lenPosName := binary.LittleEndian.Uint32(data[11:15])

return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:])
}
24 changes: 24 additions & 0 deletions server/resp.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package server

import (
"context"
"fmt"
"time"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)

func (c *Conn) writeOK(r *Result) error {
Expand Down Expand Up @@ -197,6 +200,25 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error {
return c.WritePacket(data)
}

func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error {
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := s.GetEvent(ctx)
cancel()

if err == context.DeadlineExceeded {
continue
}
data := make([]byte, 4, 32)
data = append(data, OK_HEADER)

data = append(data, ev.RawData...)
if err := c.WritePacket(data); err != nil {
return err
}
}
}

type noResponse struct{}
type eofResponse struct{}

Expand All @@ -220,6 +242,8 @@ func (c *Conn) WriteValue(value interface{}) error {
return c.writeFieldList(v, nil)
case []FieldValue:
return c.writeFieldValues(v)
case *replication.BinlogStreamer:
return c.writeBinlogEvents(v)
case *Stmt:
return c.writePrepare(v)
default:
Expand Down