Skip to content

binlog_syncer: reduce the times of Clone of MySQLGTIDSet to speed up #746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type BinlogSyncerConfig struct {
Dialer client.Dialer

RowsEventDecodeFunc func(*RowsEvent, []byte) error

DiscardGTIDSet bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the DiscardGTIDSet should also skip maintaining prevGset and currGset, only skipping output is no benefit to performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prevGset and currGset need to maintain for reconnect. Skip output reduce the clone operation

}

// BinlogSyncer syncs binlog event from server.
Expand All @@ -138,6 +140,9 @@ type BinlogSyncer struct {

prevGset, currGset GTIDSet

// instead of GTIDSet.Clone, use this to speed up calculate prevGset
prevMySQLGTIDEvent *GTIDEvent

running bool

ctx context.Context
Expand Down Expand Up @@ -422,6 +427,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
b.cfg.Logger.Infof("begin to sync binlog from GTID set %s", gset)

b.prevMySQLGTIDEvent = nil
b.prevGset = gset

b.m.Lock()
Expand Down Expand Up @@ -618,6 +624,7 @@ func (b *BinlogSyncer) retrySync() error {
defer b.m.Unlock()

b.parser.Reset()
b.prevMySQLGTIDEvent = nil

if b.prevGset != nil {
msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String())
Expand Down Expand Up @@ -799,28 +806,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
return b.currGset.Clone()
}

advanceCurrentGtidSet := func(uuid uuid.UUID, gno int64, domainID uint32, serverID uint32, sequenceNumber uint64) (err error) {
if b.currGset == nil {
b.currGset = b.prevGset.Clone()
}
prev := b.currGset.Clone()
switch gset := b.currGset.(type) {
case *MysqlGTIDSet:
gset.AddGTID(uuid, gno)
case *MariadbGTIDSet:
err = gset.AddSet(&MariadbGTID{DomainID: domainID, ServerID: serverID, SequenceNumber: sequenceNumber})
default:
err = errors.Errorf("unsupported GTIDSet type %T", b.currGset)
}
if err == nil {
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
if !b.currGset.Equal(prev) {
b.prevGset = prev
}
}
return err
}

switch event := e.Event.(type) {
case *RotateEvent:
b.nextPos.Name = string(event.NextLogName)
Expand All @@ -830,24 +815,37 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
if b.prevGset == nil {
break
}
if b.currGset == nil {
b.currGset = b.prevGset.Clone()
}
u, _ := uuid.FromBytes(event.SID)
err := advanceCurrentGtidSet(u, event.GNO, 0, 0, 0)
if err != nil {
return errors.Trace(err)
b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO)
if b.prevMySQLGTIDEvent != nil {
u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID)
b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also consider the case "right after reconnect we will see same gtid as we saw before, thus currGset will not get changed"? (but I don't know when it will happen)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't happen, because we will leave the currSet empty when we retry, and the mariadb comment will just stay as it is.

b.currGset = nil

}
b.prevMySQLGTIDEvent = event
case *MariadbGTIDEvent:
if b.prevGset == nil {
break
}
GTID := event.GTID
err := advanceCurrentGtidSet(uuid.Nil, 0, GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
prev := b.currGset.Clone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't do it for MariaDB?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK seems MariaDB has a simpler representation of GTID, we can leave this improvement to future when we found a bottleneck

err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
if err != nil {
return errors.Trace(err)
}
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
if !b.currGset.Equal(prev) {
b.prevGset = prev
}
case *XIDEvent:
event.GSet = getCurrentGtidSet()
if !b.cfg.DiscardGTIDSet {
event.GSet = getCurrentGtidSet()
}
case *QueryEvent:
event.GSet = getCurrentGtidSet()
if !b.cfg.DiscardGTIDSet {
event.GSet = getCurrentGtidSet()
}
}

needStop := false
Expand Down