From 72a61b9919c2974b7c9e604f0243626b6cd1de20 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Nov 2022 21:11:35 +0800 Subject: [PATCH 1/2] Reduce the times of clones of MySQLGTIDSet --- replication/binlogsyncer.go | 59 ++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 0c434c21b..7da15ca3a 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -120,6 +120,8 @@ type BinlogSyncerConfig struct { Dialer client.Dialer RowsEventDecodeFunc func(*RowsEvent, []byte) error + + DiscardGTIDSet bool } // BinlogSyncer syncs binlog event from server. @@ -138,6 +140,9 @@ type BinlogSyncer struct { prevGset, currGset GTIDSet + // instead of GTIDSet.Clone, use this to speed up calculate prevGset + prevMySQLGTIDEvent *GTIDEvent + running bool ctx context.Context @@ -422,6 +427,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { b.cfg.Logger.Infof("begin to sync binlog from GTID set %s", gset) + b.prevMySQLGTIDEvent = nil b.prevGset = gset b.m.Lock() @@ -618,6 +624,7 @@ func (b *BinlogSyncer) retrySync() error { defer b.m.Unlock() b.parser.Reset() + b.prevMySQLGTIDEvent = nil if b.prevGset != nil { msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String()) @@ -799,28 +806,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { return b.currGset.Clone() } - advanceCurrentGtidSet := func(uuid uuid.UUID, gno int64, domainID uint32, serverID uint32, sequenceNumber uint64) (err error) { - if b.currGset == nil { - b.currGset = b.prevGset.Clone() - } - prev := b.currGset.Clone() - switch gset := b.currGset.(type) { - case *MysqlGTIDSet: - gset.AddGTID(uuid, gno) - case *MariadbGTIDSet: - err = gset.AddSet(&MariadbGTID{DomainID: domainID, ServerID: serverID, SequenceNumber: sequenceNumber}) - default: - err = errors.Errorf("unsupported GTIDSet type %T", b.currGset) - } - if err == nil { - // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed - if !b.currGset.Equal(prev) { - b.prevGset = prev - } - } - return err - } - switch event := e.Event.(type) { case *RotateEvent: b.nextPos.Name = string(event.NextLogName) @@ -830,24 +815,38 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { if b.prevGset == nil { break } + if b.currGset == nil { + b.currGset = b.prevGset.Clone() + } u, _ := uuid.FromBytes(event.SID) - err := advanceCurrentGtidSet(u, event.GNO, 0, 0, 0) - if err != nil { - return errors.Trace(err) + b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO) + if b.prevMySQLGTIDEvent != nil { + u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID) + b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO) } + b.prevMySQLGTIDEvent = event case *MariadbGTIDEvent: if b.prevGset == nil { break } - GTID := event.GTID - err := advanceCurrentGtidSet(uuid.Nil, 0, GTID.DomainID, GTID.ServerID, GTID.SequenceNumber) - if err != nil { + prev := b.currGset.Clone() + err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) + if err == nil { + // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed + if !b.currGset.Equal(prev) { + b.prevGset = prev + } + } else { return errors.Trace(err) } case *XIDEvent: - event.GSet = getCurrentGtidSet() + if !b.cfg.DiscardGTIDSet { + event.GSet = getCurrentGtidSet() + } case *QueryEvent: - event.GSet = getCurrentGtidSet() + if !b.cfg.DiscardGTIDSet { + event.GSet = getCurrentGtidSet() + } } needStop := false From 5434d9b8dd66db0309a61524b78243aa8b1aac76 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Nov 2022 21:19:19 +0800 Subject: [PATCH 2/2] update --- replication/binlogsyncer.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 7da15ca3a..7421e102b 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -831,14 +831,13 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } prev := b.currGset.Clone() err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) - if err == nil { - // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed - if !b.currGset.Equal(prev) { - b.prevGset = prev - } - } else { + if err != nil { return errors.Trace(err) } + // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed + if !b.currGset.Equal(prev) { + b.prevGset = prev + } case *XIDEvent: if !b.cfg.DiscardGTIDSet { event.GSet = getCurrentGtidSet()