From ef0bcca6f622f41c8ca5b55dd3639492a1cf6f77 Mon Sep 17 00:00:00 2001 From: Jijun Gao Date: Mon, 12 Dec 2022 10:51:10 +0800 Subject: [PATCH 1/3] handle fake roate event as real if binlog filename changed --- canal/sync.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index abd835c96..89c076f21 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -65,9 +65,15 @@ func (c *Canal) runSyncBinlog() error { case *replication.RotateEvent: fakeRotateLogName = string(e.NextLogName) c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName) + if fakeRotateLogName != c.master.Position().Name { + fakeRotateLogName = "" + c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real roate event") + } else { + continue + } + default: + continue } - - continue } savePos = false From 1902f25bda1ada1b71cb1b6016f560a2b68d19f1 Mon Sep 17 00:00:00 2001 From: Jijun Gao Date: Mon, 12 Dec 2022 11:04:27 +0800 Subject: [PATCH 2/3] no need to preserve fakeRoteEventLogName if not changed --- canal/sync.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 89c076f21..323dd95c2 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -43,10 +43,6 @@ func (c *Canal) runSyncBinlog() error { savePos := false force := false - // The name of the binlog file received in the fake rotate event. - // It must be preserved until the new position is saved. - fakeRotateLogName := "" - for { ev, err := s.GetEvent(c.ctx) if err != nil { @@ -63,10 +59,9 @@ func (c *Canal) runSyncBinlog() error { if ev.Header.LogPos == 0 { switch e := ev.Event.(type) { case *replication.RotateEvent: - fakeRotateLogName = string(e.NextLogName) + fakeRotateLogName := string(e.NextLogName) c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName) if fakeRotateLogName != c.master.Position().Name { - fakeRotateLogName = "" c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real roate event") } else { continue @@ -85,11 +80,6 @@ func (c *Canal) runSyncBinlog() error { // next binlog pos pos.Pos = ev.Header.LogPos - // new file name received in the fake rotate event - if fakeRotateLogName != "" { - pos.Name = fakeRotateLogName - } - // We only save position with RotateEvent and XIDEvent. // For RowsEvent, we can't save the position until meeting XIDEvent // which tells the whole transaction is over. @@ -180,7 +170,6 @@ func (c *Canal) runSyncBinlog() error { if savePos { c.master.Update(pos) c.master.UpdateTimestamp(ev.Header.Timestamp) - fakeRotateLogName = "" if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil { return errors.Trace(err) From bf4f6397a13992dc0121896d718f9911811ff008 Mon Sep 17 00:00:00 2001 From: Jijun Gao Date: Mon, 12 Dec 2022 14:54:20 +0800 Subject: [PATCH 3/3] fix typo --- canal/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canal/sync.go b/canal/sync.go index 323dd95c2..34a69cb58 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -62,7 +62,7 @@ func (c *Canal) runSyncBinlog() error { fakeRotateLogName := string(e.NextLogName) c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName) if fakeRotateLogName != c.master.Position().Name { - c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real roate event") + c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real rotate event") } else { continue }