Skip to content

Fix bug in handling sub events of replication.TransactionPayloadEvent #875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 98 additions & 91 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ func (c *Canal) runSyncBinlog() error {
return err
}

savePos := false
force := false

for {
ev, err := s.GetEvent(c.ctx)
if err != nil {
Expand Down Expand Up @@ -69,110 +66,120 @@ func (c *Canal) runSyncBinlog() error {
}
}

savePos = false
force = false
pos := c.master.Position()
err = c.handleEvent(ev)
if err != nil {
return err
}
}
}

curPos := pos.Pos
func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
savePos := false
force := false
pos := c.master.Position()
var err error

// next binlog pos
pos.Pos = ev.Header.LogPos
curPos := pos.Pos

// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
// TODO: If we meet any DDL query, we must save too.
switch e := ev.Event.(type) {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
// next binlog pos
pos.Pos = ev.Header.LogPos

// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
// TODO: If we meet any DDL query, we must save too.
switch e := ev.Event.(type) {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
if err != nil {
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
return nil
case *replication.TransactionPayloadEvent:
// handle subevent row by row
ev := ev.Event.(*replication.TransactionPayloadEvent)
for _, subEvent := range ev.Events {
err = c.handleEvent(subEvent)
if err != nil {
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
c.cfg.Logger.Errorf("handle transaction payload subevent at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
continue
case *replication.TransactionPayloadEvent:
// handle subevent row by row
ev := ev.Event.(*replication.TransactionPayloadEvent)
for _, subEvent := range ev.Events {
err = c.handleRowsEvent(subEvent)
if err != nil {
c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err)
}
return nil
case *replication.XIDEvent:
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
return errors.Trace(err)
}
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
case *replication.MariadbGTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.GTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsQueryEvent:
if err := c.eventHandler.OnRowsQueryEvent(e); err != nil {
return errors.Trace(err)
}
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
return nil
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
continue
case *replication.XIDEvent:
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
return errors.Trace(err)
}
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
case *replication.MariadbGTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.GTIDEvent:
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsQueryEvent:
if err := c.eventHandler.OnRowsQueryEvent(e); err != nil {
return errors.Trace(err)
}
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
}
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
default:
continue
}
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
default:
return nil
}

if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)
if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)

if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
}

return nil
}

type node struct {
Expand Down