@@ -96,12 +96,17 @@ func (c *Canal) runSyncBinlog() error {
96
96
// we only focus row based event
97
97
err = c .handleRowsEvent (ev )
98
98
if err != nil {
99
- e := errors .Cause (err )
100
- // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal
101
- if e != ErrExcludedTable &&
102
- e != schema .ErrTableNotExist &&
103
- e != schema .ErrMissingTableMeta {
104
- c .cfg .Logger .Errorf ("handle rows event at (%s, %d) error %v" , pos .Name , curPos , err )
99
+ c .cfg .Logger .Errorf ("handle rows event at (%s, %d) error %v" , pos .Name , curPos , err )
100
+ return errors .Trace (err )
101
+ }
102
+ continue
103
+ case * replication.TransactionPayloadEvent :
104
+ // handle subevent row by row
105
+ ev := ev .Event .(* replication.TransactionPayloadEvent )
106
+ for _ , subEvent := range ev .Events {
107
+ err = c .handleRowsEvent (subEvent )
108
+ if err != nil {
109
+ c .cfg .Logger .Errorf ("handle transaction payload rows event at (%s, %d) error %v" , pos .Name , curPos , err )
105
110
return errors .Trace (err )
106
111
}
107
112
}
@@ -232,11 +237,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
232
237
ev := e .Event .(* replication.RowsEvent )
233
238
234
239
// Caveat: table may be altered at runtime.
235
- schema := string (ev .Table .Schema )
236
- table := string (ev .Table .Table )
240
+ schemaName := string (ev .Table .Schema )
241
+ tableName := string (ev .Table .Table )
237
242
238
- t , err := c .GetTable (schema , table )
243
+ t , err := c .GetTable (schemaName , tableName )
239
244
if err != nil {
245
+ e := errors .Cause (err )
246
+ // ignore errors below
247
+ if e == ErrExcludedTable || e == schema .ErrTableNotExist || e == schema .ErrMissingTableMeta {
248
+ err = nil
249
+ }
250
+
240
251
return err
241
252
}
242
253
var action string
0 commit comments