Skip to content

Commit 5076dc2

Browse files
committed
address comments
1 parent 282a0ea commit 5076dc2

File tree

7 files changed

+43
-24
lines changed

7 files changed

+43
-24
lines changed

canal/handler.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type EventHandler interface {
1414
OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
1515
OnRow(e *RowsEvent) error
1616
OnXID(header *replication.EventHeader, nextPos mysql.Position) error
17-
OnGTID(header *replication.EventHeader, gtidEvent mysql.GTIDEvent) error
17+
OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error
1818
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
1919
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
2020
String() string
@@ -32,9 +32,11 @@ func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, str
3232
func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error {
3333
return nil
3434
}
35-
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
36-
func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil }
37-
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDEvent) error { return nil }
35+
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
36+
func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil }
37+
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error {
38+
return nil
39+
}
3840
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
3941
return nil
4042
}

canal/sync.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (c *Canal) runSyncBinlog() error {
119119
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
120120
return errors.Trace(err)
121121
}
122-
case *replication.MySQLGTIDEvent:
122+
case *replication.GTIDEvent:
123123
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
124124
return errors.Trace(err)
125125
}

mysql/gtid.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {
3030
}
3131
}
3232

33-
type GTIDEvent interface {
33+
type BinlogGTIDEvent interface {
3434
GTIDNext() (GTIDSet, error)
35-
Decode(data []byte) error
35+
SequenceNumber() int64
36+
LastCommitted() int64
3637
}

replication/binlogsyncer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ type BinlogSyncer struct {
141141
prevGset, currGset GTIDSet
142142

143143
// instead of GTIDSet.Clone, use this to speed up calculate prevGset
144-
prevMySQLGTIDEvent *MySQLGTIDEvent
144+
prevMySQLGTIDEvent *GTIDEvent
145145

146146
running bool
147147

@@ -811,7 +811,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
811811
b.nextPos.Name = string(event.NextLogName)
812812
b.nextPos.Pos = uint32(event.Position)
813813
b.cfg.Logger.Infof("rotate to %s", b.nextPos)
814-
case *MySQLGTIDEvent:
814+
case *GTIDEvent:
815815
if b.prevGset == nil {
816816
break
817817
}

replication/event.go

+28-12
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,12 @@ func (e *QueryEvent) Dump(w io.Writer) {
356356
fmt.Fprintln(w)
357357
}
358358

359-
type MySQLGTIDEvent struct {
359+
type GTIDEvent struct {
360360
CommitFlag uint8
361361
SID []byte
362362
GNO int64
363-
LastCommitted int64
364-
SequenceNumber int64
363+
lastCommitted int64
364+
sequenceNumber int64
365365

366366
// ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
367367
// https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
@@ -378,7 +378,7 @@ type MySQLGTIDEvent struct {
378378
OriginalServerVersion uint32
379379
}
380380

381-
func (e *MySQLGTIDEvent) Decode(data []byte) error {
381+
func (e *GTIDEvent) Decode(data []byte) error {
382382
pos := 0
383383
e.CommitFlag = data[pos]
384384
pos++
@@ -390,9 +390,9 @@ func (e *MySQLGTIDEvent) Decode(data []byte) error {
390390
if len(data) >= 42 {
391391
if data[pos] == LogicalTimestampTypeCode {
392392
pos++
393-
e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
393+
e.lastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
394394
pos += PartLogicalTimestampLength
395-
e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
395+
e.sequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
396396
pos += 8
397397

398398
// IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7
@@ -441,7 +441,7 @@ func (e *MySQLGTIDEvent) Decode(data []byte) error {
441441
return nil
442442
}
443443

444-
func (e *MySQLGTIDEvent) Dump(w io.Writer) {
444+
func (e *GTIDEvent) Dump(w io.Writer) {
445445
fmtTime := func(t time.Time) string {
446446
if t.IsZero() {
447447
return "<n/a>"
@@ -452,8 +452,8 @@ func (e *MySQLGTIDEvent) Dump(w io.Writer) {
452452
fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
453453
u, _ := uuid.FromBytes(e.SID)
454454
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
455-
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
456-
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
455+
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.lastCommitted)
456+
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.sequenceNumber)
457457
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
458458
fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime()))
459459
fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength)
@@ -462,23 +462,31 @@ func (e *MySQLGTIDEvent) Dump(w io.Writer) {
462462
fmt.Fprintln(w)
463463
}
464464

465-
func (e *MySQLGTIDEvent) GTIDNext() (GTIDSet, error) {
465+
func (e *GTIDEvent) GTIDNext() (GTIDSet, error) {
466466
u, err := uuid.FromBytes(e.SID)
467467
if err != nil {
468468
return nil, err
469469
}
470470
return ParseMysqlGTIDSet(strings.Join([]string{u.String(), strconv.FormatInt(e.GNO, 10)}, ":"))
471471
}
472472

473+
func (e *GTIDEvent) SequenceNumber() int64 {
474+
return e.sequenceNumber
475+
}
476+
477+
func (e *GTIDEvent) LastCommitted() int64 {
478+
return e.lastCommitted
479+
}
480+
473481
// ImmediateCommitTime returns the commit time of this trx on the immediate server
474482
// or zero time if not available.
475-
func (e *MySQLGTIDEvent) ImmediateCommitTime() time.Time {
483+
func (e *GTIDEvent) ImmediateCommitTime() time.Time {
476484
return microSecTimestampToTime(e.ImmediateCommitTimestamp)
477485
}
478486

479487
// OriginalCommitTime returns the commit time of this trx on the original server
480488
// or zero time if not available.
481-
func (e *MySQLGTIDEvent) OriginalCommitTime() time.Time {
489+
func (e *GTIDEvent) OriginalCommitTime() time.Time {
482490
return microSecTimestampToTime(e.OriginalCommitTimestamp)
483491
}
484492

@@ -637,6 +645,14 @@ func (e *MariadbGTIDEvent) GTIDNext() (GTIDSet, error) {
637645
return ParseMariadbGTIDSet(e.GTID.String())
638646
}
639647

648+
func (e *MariadbGTIDEvent) SequenceNumber() int64 {
649+
return int64(e.GTID.SequenceNumber)
650+
}
651+
652+
func (e *MariadbGTIDEvent) LastCommitted() int64 {
653+
return int64(e.GTID.SequenceNumber - 1)
654+
}
655+
640656
type MariadbGTIDListEvent struct {
641657
GTIDs []MariadbGTID
642658
}

replication/event_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestGTIDEventMysql8NewFields(t *testing.T) {
9090
}
9191

9292
for _, tc := range testcases {
93-
ev := new(MySQLGTIDEvent)
93+
ev := new(GTIDEvent)
9494
err := ev.Decode(tc.data)
9595
require.NoError(t, err)
9696
require.Equal(t, tc.expectImmediateCommitTimestamp, ev.ImmediateCommitTimestamp)

replication/parser.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,9 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
283283
case ROWS_QUERY_EVENT:
284284
e = &RowsQueryEvent{}
285285
case GTID_EVENT:
286-
e = &MySQLGTIDEvent{}
286+
e = &GTIDEvent{}
287287
case ANONYMOUS_GTID_EVENT:
288-
e = &MySQLGTIDEvent{}
288+
e = &GTIDEvent{}
289289
case BEGIN_LOAD_QUERY_EVENT:
290290
e = &BeginLoadQueryEvent{}
291291
case EXECUTE_LOAD_QUERY_EVENT:

0 commit comments

Comments
 (0)