Skip to content

Commit 7685eee

Browse files
authored
Merge pull request #740 from BLAZZ/master
EventHandlerV2 support handle event with replication.EventHeader
2 parents 6d8a585 + 778f99e commit 7685eee

File tree

5 files changed

+31
-25
lines changed

5 files changed

+31
-25
lines changed

canal/canal.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (c *Canal) Close() {
258258
c.conn = nil
259259
c.connLock.Unlock()
260260

261-
_ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true)
261+
_ = c.eventHandler.OnPosSynced(nil, c.master.Position(), c.master.GTIDSet(), true)
262262
}
263263

264264
func (c *Canal) WaitDumpDone() <-chan struct{} {

canal/canal_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (h *testEventHandler) String() string {
123123
return "testEventHandler"
124124
}
125125

126-
func (h *testEventHandler) OnPosSynced(p mysql.Position, set mysql.GTIDSet, f bool) error {
126+
func (h *testEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
127127
return nil
128128
}
129129

canal/dump.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (c *Canal) dump() error {
176176
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
177177
c.master.Update(pos)
178178
c.master.UpdateGTIDSet(h.gset)
179-
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
179+
if err := c.eventHandler.OnPosSynced(nil, pos, c.master.GTIDSet(), true); err != nil {
180180
return errors.Trace(err)
181181
}
182182
var startPos fmt.Stringer = pos

canal/handler.go

+19-13
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,38 @@ import (
66
)
77

88
type EventHandler interface {
9-
OnRotate(rotateEvent *replication.RotateEvent) error
9+
OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error
1010
// OnTableChanged is called when the table is created, altered, renamed or dropped.
1111
// You need to clear the associated data like cache with the table.
1212
// It will be called before OnDDL.
13-
OnTableChanged(schema string, table string) error
14-
OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
13+
OnTableChanged(header *replication.EventHeader, schema string, table string) error
14+
OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
1515
OnRow(e *RowsEvent) error
16-
OnXID(nextPos mysql.Position) error
17-
OnGTID(gtid mysql.GTIDSet) error
16+
OnXID(header *replication.EventHeader, nextPos mysql.Position) error
17+
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
1818
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
19-
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
19+
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
2020
String() string
2121
}
2222

2323
type DummyEventHandler struct {
2424
}
2525

26-
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil }
27-
func (h *DummyEventHandler) OnTableChanged(schema string, table string) error { return nil }
28-
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
26+
func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error {
27+
return nil
28+
}
29+
func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error {
30+
return nil
31+
}
32+
func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error {
33+
return nil
34+
}
35+
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
36+
func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil }
37+
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error { return nil }
38+
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
2939
return nil
3040
}
31-
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
32-
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
33-
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
34-
func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil }
3541

3642
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
3743

canal/sync.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (c *Canal) runSyncBinlog() error {
9595
c.cfg.Logger.Infof("rotate binlog to %s", pos)
9696
savePos = true
9797
force = true
98-
if err = c.eventHandler.OnRotate(e); err != nil {
98+
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
9999
return errors.Trace(err)
100100
}
101101
case *replication.RowsEvent:
@@ -115,7 +115,7 @@ func (c *Canal) runSyncBinlog() error {
115115
case *replication.XIDEvent:
116116
savePos = true
117117
// try to save the position later
118-
if err := c.eventHandler.OnXID(pos); err != nil {
118+
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
119119
return errors.Trace(err)
120120
}
121121
if e.GSet != nil {
@@ -127,7 +127,7 @@ func (c *Canal) runSyncBinlog() error {
127127
if err != nil {
128128
return errors.Trace(err)
129129
}
130-
if err := c.eventHandler.OnGTID(gtid); err != nil {
130+
if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
131131
return errors.Trace(err)
132132
}
133133
case *replication.GTIDEvent:
@@ -136,7 +136,7 @@ func (c *Canal) runSyncBinlog() error {
136136
if err != nil {
137137
return errors.Trace(err)
138138
}
139-
if err := c.eventHandler.OnGTID(gtid); err != nil {
139+
if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
140140
return errors.Trace(err)
141141
}
142142
case *replication.QueryEvent:
@@ -151,15 +151,15 @@ func (c *Canal) runSyncBinlog() error {
151151
if node.db == "" {
152152
node.db = string(e.Schema)
153153
}
154-
if err = c.updateTable(node.db, node.table); err != nil {
154+
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
155155
return errors.Trace(err)
156156
}
157157
}
158158
if len(nodes) > 0 {
159159
savePos = true
160160
force = true
161161
// Now we only handle Table Changed DDL, maybe we will support more later.
162-
if err = c.eventHandler.OnDDL(pos, e); err != nil {
162+
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
163163
return errors.Trace(err)
164164
}
165165
}
@@ -176,7 +176,7 @@ func (c *Canal) runSyncBinlog() error {
176176
c.master.UpdateTimestamp(ev.Header.Timestamp)
177177
fakeRotateLogName = ""
178178

179-
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil {
179+
if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
180180
return errors.Trace(err)
181181
}
182182
}
@@ -228,10 +228,10 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {
228228
return ns
229229
}
230230

231-
func (c *Canal) updateTable(db, table string) (err error) {
231+
func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (err error) {
232232
c.ClearTableCache([]byte(db), []byte(table))
233233
c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
234-
if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
234+
if err = c.eventHandler.OnTableChanged(header, db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
235235
return errors.Trace(err)
236236
}
237237
return

0 commit comments

Comments
 (0)