diff --git a/canal/handler.go b/canal/handler.go index 71eccd74c..4ddf03bfb 100644 --- a/canal/handler.go +++ b/canal/handler.go @@ -14,7 +14,7 @@ type EventHandler interface { OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error OnXID(header *replication.EventHeader, nextPos mysql.Position) error - OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error + OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error String() string @@ -34,7 +34,9 @@ func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *rep } func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil } func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil } -func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error { return nil } +func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error { + return nil +} func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error { return nil } diff --git a/canal/sync.go b/canal/sync.go index 7df181226..5523d9fe4 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -1,14 +1,12 @@ package canal import ( - "fmt" "sync/atomic" "time" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" - "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/ast" ) @@ -118,21 +116,11 @@ func (c *Canal) runSyncBinlog() error { c.master.UpdateGTIDSet(e.GSet) } case *replication.MariadbGTIDEvent: - // try to save the GTID later - gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String()) - if err != nil { - return errors.Trace(err) - } - if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil { + if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { return errors.Trace(err) } case *replication.GTIDEvent: - u, _ := uuid.FromBytes(e.SID) - gtid, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO)) - if err != nil { - return errors.Trace(err) - } - if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil { + if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { return errors.Trace(err) } case *replication.QueryEvent: diff --git a/mysql/gtid.go b/mysql/gtid.go index ae519ef25..b73bd7eda 100644 --- a/mysql/gtid.go +++ b/mysql/gtid.go @@ -29,3 +29,7 @@ func ParseGTIDSet(flavor string, s string) (GTIDSet, error) { return nil, errors.Errorf("invalid flavor %s", flavor) } } + +type BinlogGTIDEvent interface { + GTIDNext() (GTIDSet, error) +} diff --git a/replication/event.go b/replication/event.go index 264b230ca..566efedaf 100644 --- a/replication/event.go +++ b/replication/event.go @@ -462,6 +462,14 @@ func (e *GTIDEvent) Dump(w io.Writer) { fmt.Fprintln(w) } +func (e *GTIDEvent) GTIDNext() (GTIDSet, error) { + u, err := uuid.FromBytes(e.SID) + if err != nil { + return nil, err + } + return ParseMysqlGTIDSet(strings.Join([]string{u.String(), strconv.FormatInt(e.GNO, 10)}, ":")) +} + // ImmediateCommitTime returns the commit time of this trx on the immediate server // or zero time if not available. func (e *GTIDEvent) ImmediateCommitTime() time.Time { @@ -625,6 +633,10 @@ func (e *MariadbGTIDEvent) Dump(w io.Writer) { fmt.Fprintln(w) } +func (e *MariadbGTIDEvent) GTIDNext() (GTIDSet, error) { + return ParseMariadbGTIDSet(e.GTID.String()) +} + type MariadbGTIDListEvent struct { GTIDs []MariadbGTID } diff --git a/replication/event_test.go b/replication/event_test.go index 78bddc345..a903ab9b4 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -1,8 +1,10 @@ package replication import ( + "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -49,6 +51,9 @@ func TestMariadbGTIDEvent(t *testing.T) { require.True(t, ev.IsStandalone()) require.True(t, ev.IsGroupCommit()) require.Equal(t, uint64(0x1716151413121110), ev.CommitID) + set, err := ev.GTIDNext() + require.NoError(t, err) + require.Equal(t, "70975786-0-578437695752307201", set.String()) } func TestGTIDEventMysql8NewFields(t *testing.T) { @@ -59,6 +64,9 @@ func TestGTIDEventMysql8NewFields(t *testing.T) { expectTransactionLength uint64 expectImmediateServerVersion uint32 expectOriginalServerVersion uint32 + expectGTID string + expectSequnceNumber int64 + expectLastCommitted int64 }{ { // master: mysql-5.7, slave: mysql-8.0 @@ -68,6 +76,9 @@ func TestGTIDEventMysql8NewFields(t *testing.T) { expectTransactionLength: 965, expectImmediateServerVersion: 80019, expectOriginalServerVersion: 0, + expectGTID: "5aa72a7f-44a8-11ea-947f-0242ac190002:258", + expectSequnceNumber: 119, + expectLastCommitted: 118, }, { // mysql-5.7 only @@ -77,6 +88,9 @@ func TestGTIDEventMysql8NewFields(t *testing.T) { expectTransactionLength: 0, expectImmediateServerVersion: 0, expectOriginalServerVersion: 0, + expectGTID: "5aa72a7f-44a8-11ea-947f-0242ac190002:259", + expectSequnceNumber: 54, + expectLastCommitted: 53, }, { // mysql-8.0 only @@ -86,10 +100,13 @@ func TestGTIDEventMysql8NewFields(t *testing.T) { expectTransactionLength: 963, expectImmediateServerVersion: 80019, expectOriginalServerVersion: 80019, + expectGTID: "5ccc1033-44a8-11ea-bd59-0242ac190003:119", + expectSequnceNumber: 121, + expectLastCommitted: 120, }, } - for _, tc := range testcases { + for i, tc := range testcases { ev := new(GTIDEvent) err := ev.Decode(tc.data) require.NoError(t, err) @@ -98,6 +115,11 @@ func TestGTIDEventMysql8NewFields(t *testing.T) { require.Equal(t, tc.expectTransactionLength, ev.TransactionLength) require.Equal(t, tc.expectImmediateServerVersion, ev.ImmediateServerVersion) require.Equal(t, tc.expectOriginalServerVersion, ev.OriginalServerVersion) + set, err := ev.GTIDNext() + require.NoError(t, err) + assert.Equal(t, tc.expectGTID, set.String(), fmt.Sprintf("testcase: %d", i)) + assert.Equal(t, tc.expectSequnceNumber, ev.SequenceNumber, fmt.Sprintf("testcase: %d", i)) + assert.Equal(t, tc.expectLastCommitted, ev.LastCommitted, fmt.Sprintf("testcase: %d", i)) } }