Skip to content

Commit 8206ccd

Browse files
authored
Merge pull request #746 from GMHDBJD/speedUpMySQLGTID
binlog_syncer: reduce the times of Clone of MySQLGTIDSet to speed up
2 parents 7685eee + eb89479 commit 8206ccd

File tree

1 file changed

+27
-29
lines changed

1 file changed

+27
-29
lines changed

replication/binlogsyncer.go

+27-29
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ type BinlogSyncerConfig struct {
120120
Dialer client.Dialer
121121

122122
RowsEventDecodeFunc func(*RowsEvent, []byte) error
123+
124+
DiscardGTIDSet bool
123125
}
124126

125127
// BinlogSyncer syncs binlog event from server.
@@ -138,6 +140,9 @@ type BinlogSyncer struct {
138140

139141
prevGset, currGset GTIDSet
140142

143+
// instead of GTIDSet.Clone, use this to speed up calculate prevGset
144+
prevMySQLGTIDEvent *GTIDEvent
145+
141146
running bool
142147

143148
ctx context.Context
@@ -422,6 +427,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
422427
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
423428
b.cfg.Logger.Infof("begin to sync binlog from GTID set %s", gset)
424429

430+
b.prevMySQLGTIDEvent = nil
425431
b.prevGset = gset
426432

427433
b.m.Lock()
@@ -618,6 +624,7 @@ func (b *BinlogSyncer) retrySync() error {
618624
defer b.m.Unlock()
619625

620626
b.parser.Reset()
627+
b.prevMySQLGTIDEvent = nil
621628

622629
if b.prevGset != nil {
623630
msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String())
@@ -799,28 +806,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
799806
return b.currGset.Clone()
800807
}
801808

802-
advanceCurrentGtidSet := func(uuid uuid.UUID, gno int64, domainID uint32, serverID uint32, sequenceNumber uint64) (err error) {
803-
if b.currGset == nil {
804-
b.currGset = b.prevGset.Clone()
805-
}
806-
prev := b.currGset.Clone()
807-
switch gset := b.currGset.(type) {
808-
case *MysqlGTIDSet:
809-
gset.AddGTID(uuid, gno)
810-
case *MariadbGTIDSet:
811-
err = gset.AddSet(&MariadbGTID{DomainID: domainID, ServerID: serverID, SequenceNumber: sequenceNumber})
812-
default:
813-
err = errors.Errorf("unsupported GTIDSet type %T", b.currGset)
814-
}
815-
if err == nil {
816-
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
817-
if !b.currGset.Equal(prev) {
818-
b.prevGset = prev
819-
}
820-
}
821-
return err
822-
}
823-
824809
switch event := e.Event.(type) {
825810
case *RotateEvent:
826811
b.nextPos.Name = string(event.NextLogName)
@@ -830,24 +815,37 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
830815
if b.prevGset == nil {
831816
break
832817
}
818+
if b.currGset == nil {
819+
b.currGset = b.prevGset.Clone()
820+
}
833821
u, _ := uuid.FromBytes(event.SID)
834-
err := advanceCurrentGtidSet(u, event.GNO, 0, 0, 0)
835-
if err != nil {
836-
return errors.Trace(err)
822+
b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO)
823+
if b.prevMySQLGTIDEvent != nil {
824+
u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID)
825+
b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO)
837826
}
827+
b.prevMySQLGTIDEvent = event
838828
case *MariadbGTIDEvent:
839829
if b.prevGset == nil {
840830
break
841831
}
842-
GTID := event.GTID
843-
err := advanceCurrentGtidSet(uuid.Nil, 0, GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
832+
prev := b.currGset.Clone()
833+
err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
844834
if err != nil {
845835
return errors.Trace(err)
846836
}
837+
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
838+
if !b.currGset.Equal(prev) {
839+
b.prevGset = prev
840+
}
847841
case *XIDEvent:
848-
event.GSet = getCurrentGtidSet()
842+
if !b.cfg.DiscardGTIDSet {
843+
event.GSet = getCurrentGtidSet()
844+
}
849845
case *QueryEvent:
850-
event.GSet = getCurrentGtidSet()
846+
if !b.cfg.DiscardGTIDSet {
847+
event.GSet = getCurrentGtidSet()
848+
}
851849
}
852850

853851
needStop := false

0 commit comments

Comments
 (0)