diff --git a/canal/sync.go b/canal/sync.go index abd835c96..34a69cb58 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -43,10 +43,6 @@ func (c *Canal) runSyncBinlog() error { savePos := false force := false - // The name of the binlog file received in the fake rotate event. - // It must be preserved until the new position is saved. - fakeRotateLogName := "" - for { ev, err := s.GetEvent(c.ctx) if err != nil { @@ -63,11 +59,16 @@ func (c *Canal) runSyncBinlog() error { if ev.Header.LogPos == 0 { switch e := ev.Event.(type) { case *replication.RotateEvent: - fakeRotateLogName = string(e.NextLogName) + fakeRotateLogName := string(e.NextLogName) c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName) + if fakeRotateLogName != c.master.Position().Name { + c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real rotate event") + } else { + continue + } + default: + continue } - - continue } savePos = false @@ -79,11 +80,6 @@ func (c *Canal) runSyncBinlog() error { // next binlog pos pos.Pos = ev.Header.LogPos - // new file name received in the fake rotate event - if fakeRotateLogName != "" { - pos.Name = fakeRotateLogName - } - // We only save position with RotateEvent and XIDEvent. // For RowsEvent, we can't save the position until meeting XIDEvent // which tells the whole transaction is over. @@ -174,7 +170,6 @@ func (c *Canal) runSyncBinlog() error { if savePos { c.master.Update(pos) c.master.UpdateTimestamp(ev.Header.Timestamp) - fakeRotateLogName = "" if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil { return errors.Trace(err)