Skip to content

Commit 2ed1ac5

Browse files
committed
add gtid event interface
1 parent 09a16f2 commit 2ed1ac5

File tree

5 files changed

+45
-17
lines changed

5 files changed

+45
-17
lines changed

canal/handler.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type EventHandler interface {
1414
OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
1515
OnRow(e *RowsEvent) error
1616
OnXID(header *replication.EventHeader, nextPos mysql.Position) error
17-
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
17+
OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error
1818
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
1919
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
2020
String() string
@@ -34,7 +34,9 @@ func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *rep
3434
}
3535
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
3636
func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil }
37-
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error { return nil }
37+
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error {
38+
return nil
39+
}
3840
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
3941
return nil
4042
}

canal/sync.go

+2-14
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package canal
22

33
import (
4-
"fmt"
54
"sync/atomic"
65
"time"
76

87
"github.com/go-mysql-org/go-mysql/mysql"
98
"github.com/go-mysql-org/go-mysql/replication"
109
"github.com/go-mysql-org/go-mysql/schema"
11-
"github.com/google/uuid"
1210
"github.com/pingcap/errors"
1311
"github.com/pingcap/tidb/parser/ast"
1412
)
@@ -118,21 +116,11 @@ func (c *Canal) runSyncBinlog() error {
118116
c.master.UpdateGTIDSet(e.GSet)
119117
}
120118
case *replication.MariadbGTIDEvent:
121-
// try to save the GTID later
122-
gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String())
123-
if err != nil {
124-
return errors.Trace(err)
125-
}
126-
if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
119+
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
127120
return errors.Trace(err)
128121
}
129122
case *replication.GTIDEvent:
130-
u, _ := uuid.FromBytes(e.SID)
131-
gtid, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
132-
if err != nil {
133-
return errors.Trace(err)
134-
}
135-
if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
123+
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
136124
return errors.Trace(err)
137125
}
138126
case *replication.QueryEvent:

mysql/gtid.go

+4
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,7 @@ func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {
2929
return nil, errors.Errorf("invalid flavor %s", flavor)
3030
}
3131
}
32+
33+
type BinlogGTIDEvent interface {
34+
GTIDNext() (GTIDSet, error)
35+
}

replication/event.go

+12
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,14 @@ func (e *GTIDEvent) Dump(w io.Writer) {
462462
fmt.Fprintln(w)
463463
}
464464

465+
func (e *GTIDEvent) GTIDNext() (GTIDSet, error) {
466+
u, err := uuid.FromBytes(e.SID)
467+
if err != nil {
468+
return nil, err
469+
}
470+
return ParseMysqlGTIDSet(strings.Join([]string{u.String(), strconv.FormatInt(e.GNO, 10)}, ":"))
471+
}
472+
465473
// ImmediateCommitTime returns the commit time of this trx on the immediate server
466474
// or zero time if not available.
467475
func (e *GTIDEvent) ImmediateCommitTime() time.Time {
@@ -625,6 +633,10 @@ func (e *MariadbGTIDEvent) Dump(w io.Writer) {
625633
fmt.Fprintln(w)
626634
}
627635

636+
func (e *MariadbGTIDEvent) GTIDNext() (GTIDSet, error) {
637+
return ParseMariadbGTIDSet(e.GTID.String())
638+
}
639+
628640
type MariadbGTIDListEvent struct {
629641
GTIDs []MariadbGTID
630642
}

replication/event_test.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package replication
22

33
import (
4+
"fmt"
45
"testing"
56

7+
"github.com/stretchr/testify/assert"
68
"github.com/stretchr/testify/require"
79
)
810

@@ -49,6 +51,9 @@ func TestMariadbGTIDEvent(t *testing.T) {
4951
require.True(t, ev.IsStandalone())
5052
require.True(t, ev.IsGroupCommit())
5153
require.Equal(t, uint64(0x1716151413121110), ev.CommitID)
54+
set, err := ev.GTIDNext()
55+
require.NoError(t, err)
56+
require.Equal(t, "70975786-0-578437695752307201", set.String())
5257
}
5358

5459
func TestGTIDEventMysql8NewFields(t *testing.T) {
@@ -59,6 +64,9 @@ func TestGTIDEventMysql8NewFields(t *testing.T) {
5964
expectTransactionLength uint64
6065
expectImmediateServerVersion uint32
6166
expectOriginalServerVersion uint32
67+
expectGTID string
68+
expectSequnceNumber int64
69+
expectLastCommitted int64
6270
}{
6371
{
6472
// master: mysql-5.7, slave: mysql-8.0
@@ -68,6 +76,9 @@ func TestGTIDEventMysql8NewFields(t *testing.T) {
6876
expectTransactionLength: 965,
6977
expectImmediateServerVersion: 80019,
7078
expectOriginalServerVersion: 0,
79+
expectGTID: "5aa72a7f-44a8-11ea-947f-0242ac190002:258",
80+
expectSequnceNumber: 119,
81+
expectLastCommitted: 118,
7182
},
7283
{
7384
// mysql-5.7 only
@@ -77,6 +88,9 @@ func TestGTIDEventMysql8NewFields(t *testing.T) {
7788
expectTransactionLength: 0,
7889
expectImmediateServerVersion: 0,
7990
expectOriginalServerVersion: 0,
91+
expectGTID: "5aa72a7f-44a8-11ea-947f-0242ac190002:259",
92+
expectSequnceNumber: 54,
93+
expectLastCommitted: 53,
8094
},
8195
{
8296
// mysql-8.0 only
@@ -86,10 +100,13 @@ func TestGTIDEventMysql8NewFields(t *testing.T) {
86100
expectTransactionLength: 963,
87101
expectImmediateServerVersion: 80019,
88102
expectOriginalServerVersion: 80019,
103+
expectGTID: "5ccc1033-44a8-11ea-bd59-0242ac190003:119",
104+
expectSequnceNumber: 121,
105+
expectLastCommitted: 120,
89106
},
90107
}
91108

92-
for _, tc := range testcases {
109+
for i, tc := range testcases {
93110
ev := new(GTIDEvent)
94111
err := ev.Decode(tc.data)
95112
require.NoError(t, err)
@@ -98,6 +115,11 @@ func TestGTIDEventMysql8NewFields(t *testing.T) {
98115
require.Equal(t, tc.expectTransactionLength, ev.TransactionLength)
99116
require.Equal(t, tc.expectImmediateServerVersion, ev.ImmediateServerVersion)
100117
require.Equal(t, tc.expectOriginalServerVersion, ev.OriginalServerVersion)
118+
set, err := ev.GTIDNext()
119+
require.NoError(t, err)
120+
assert.Equal(t, tc.expectGTID, set.String(), fmt.Sprintf("testcase: %d", i))
121+
assert.Equal(t, tc.expectSequnceNumber, ev.SequenceNumber, fmt.Sprintf("testcase: %d", i))
122+
assert.Equal(t, tc.expectLastCommitted, ev.LastCommitted, fmt.Sprintf("testcase: %d", i))
101123
}
102124
}
103125

0 commit comments

Comments
 (0)