Skip to content

Commit bc5406a

Browse files
authored
Merge pull request #738 from GMHDBJD/speedUpGTID
speed up gtid process for mysql
2 parents 49d58c4 + 2da6a2c commit bc5406a

File tree

4 files changed

+135
-9
lines changed

4 files changed

+135
-9
lines changed

mysql/gtid.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package mysql
22

3-
import "github.com/pingcap/errors"
3+
import (
4+
"github.com/pingcap/errors"
5+
)
46

57
type GTIDSet interface {
68
String() string

mysql/mysql_gtid.go

+55-4
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,48 @@ func (s IntervalSlice) Normalize() IntervalSlice {
111111
return n
112112
}
113113

114+
func min(a, b int64) int64 {
115+
if a < b {
116+
return a
117+
}
118+
return b
119+
}
120+
121+
func max(a, b int64) int64 {
122+
if a > b {
123+
return a
124+
}
125+
return b
126+
}
127+
128+
func (s *IntervalSlice) InsertInterval(interval Interval) {
129+
var (
130+
count int
131+
i int
132+
)
133+
134+
*s = append(*s, interval)
135+
total := len(*s)
136+
for i = total - 1; i > 0; i-- {
137+
if (*s)[i].Stop < (*s)[i-1].Start {
138+
(*s)[i], (*s)[i-1] = (*s)[i-1], (*s)[i]
139+
} else if (*s)[i].Start > (*s)[i-1].Stop {
140+
break
141+
} else {
142+
(*s)[i-1].Start = min((*s)[i-1].Start, (*s)[i].Start)
143+
(*s)[i-1].Stop = max((*s)[i-1].Stop, (*s)[i].Stop)
144+
count++
145+
}
146+
}
147+
if count > 0 {
148+
i++
149+
if i+count < total {
150+
copy((*s)[i:], (*s)[i+count:])
151+
}
152+
*s = (*s)[:total-count]
153+
}
154+
}
155+
114156
// Contain returns true if sub in s
115157
func (s IntervalSlice) Contain(sub IntervalSlice) bool {
116158
j := 0
@@ -343,10 +385,9 @@ func (s *UUIDSet) Decode(data []byte) error {
343385

344386
func (s *UUIDSet) Clone() *UUIDSet {
345387
clone := new(UUIDSet)
346-
347-
copy(clone.SID[:], s.SID[:])
348-
clone.Intervals = s.Intervals.Normalize()
349-
388+
clone.SID = s.SID
389+
clone.Intervals = make([]Interval, len(s.Intervals))
390+
copy(clone.Intervals, s.Intervals)
350391
return clone
351392
}
352393

@@ -439,6 +480,16 @@ func (s *MysqlGTIDSet) Update(GTIDStr string) error {
439480
return nil
440481
}
441482

483+
func (s *MysqlGTIDSet) AddGTID(uuid uuid.UUID, gno int64) {
484+
sid := uuid.String()
485+
o, ok := s.Sets[sid]
486+
if ok {
487+
o.Intervals.InsertInterval(Interval{gno, gno + 1})
488+
} else {
489+
s.Sets[sid] = &UUIDSet{uuid, IntervalSlice{Interval{gno, gno + 1}}}
490+
}
491+
}
492+
442493
func (s *MysqlGTIDSet) Add(addend MysqlGTIDSet) error {
443494
for _, uuidSet := range addend.Sets {
444495
s.AddSet(uuidSet)

mysql/mysql_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strings"
66
"testing"
77

8+
"github.com/google/uuid"
89
"github.com/pingcap/check"
910
)
1011

@@ -77,6 +78,42 @@ func (t *mysqlTestSuite) TestMysqlGTIDIntervalSlice(c *check.C) {
7778
c.Assert(n2.Contain(n1), check.Equals, true)
7879
}
7980

81+
func (t *mysqlTestSuite) TestMysqlGTIDInsertInterval(c *check.C) {
82+
i := IntervalSlice{Interval{100, 200}}
83+
i.InsertInterval(Interval{300, 400})
84+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{100, 200}, Interval{300, 400}})
85+
86+
i.InsertInterval(Interval{50, 70})
87+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{100, 200}, Interval{300, 400}})
88+
89+
i.InsertInterval(Interval{101, 201})
90+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{100, 201}, Interval{300, 400}})
91+
92+
i.InsertInterval(Interval{99, 202})
93+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{99, 202}, Interval{300, 400}})
94+
95+
i.InsertInterval(Interval{102, 302})
96+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{99, 400}})
97+
98+
i.InsertInterval(Interval{500, 600})
99+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{99, 400}, Interval{500, 600}})
100+
101+
i.InsertInterval(Interval{50, 100})
102+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 400}, Interval{500, 600}})
103+
104+
i.InsertInterval(Interval{900, 1000})
105+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 400}, Interval{500, 600}, Interval{900, 1000}})
106+
107+
i.InsertInterval(Interval{1010, 1020})
108+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 400}, Interval{500, 600}, Interval{900, 1000}, Interval{1010, 1020}})
109+
110+
i.InsertInterval(Interval{49, 1000})
111+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{49, 1000}, Interval{1010, 1020}})
112+
113+
i.InsertInterval(Interval{1, 1012})
114+
c.Assert(i, check.DeepEquals, IntervalSlice{Interval{1, 1020}})
115+
}
116+
80117
func (t *mysqlTestSuite) TestMysqlGTIDCodec(c *check.C) {
81118
us, err := ParseUUIDSet("de278ad0-2106-11e4-9f8e-6edd0ca20947:1-2")
82119
c.Assert(err, check.IsNil)
@@ -129,6 +166,35 @@ func (t *mysqlTestSuite) TestMysqlUpdate(c *check.C) {
129166
c.Assert(g2.Equal(g1), check.IsTrue)
130167
}
131168

169+
func (t *mysqlTestSuite) TestMysqlAddGTID(c *check.C) {
170+
g, err := ParseMysqlGTIDSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:21-57")
171+
c.Assert(err, check.IsNil)
172+
173+
g1 := g.(*MysqlGTIDSet)
174+
175+
u, err := uuid.Parse("3E11FA47-71CA-11E1-9E33-C80AA9429562")
176+
c.Assert(err, check.IsNil)
177+
178+
g1.AddGTID(u, 58)
179+
c.Assert(strings.ToUpper(g1.String()), check.Equals, "3E11FA47-71CA-11E1-9E33-C80AA9429562:21-58")
180+
181+
g1.AddGTID(u, 60)
182+
c.Assert(strings.ToUpper(g1.String()), check.Equals, "3E11FA47-71CA-11E1-9E33-C80AA9429562:21-58:60")
183+
184+
g1.AddGTID(u, 59)
185+
c.Assert(strings.ToUpper(g1.String()), check.Equals, "3E11FA47-71CA-11E1-9E33-C80AA9429562:21-60")
186+
187+
u2, err := uuid.Parse("519CE70F-A893-11E9-A95A-B32DC65A7026")
188+
c.Assert(err, check.IsNil)
189+
g1.AddGTID(u2, 58)
190+
g2, err := ParseMysqlGTIDSet(`
191+
3E11FA47-71CA-11E1-9E33-C80AA9429562:21-60,
192+
519CE70F-A893-11E9-A95A-B32DC65A7026:58
193+
`)
194+
c.Assert(err, check.IsNil)
195+
c.Assert(g2.Equal(g1), check.IsTrue)
196+
}
197+
132198
func (t *mysqlTestSuite) TestMysqlGTIDContain(c *check.C) {
133199
g1, err := ParseMysqlGTIDSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:23")
134200
c.Assert(err, check.IsNil)

replication/binlogsyncer.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -799,12 +799,19 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
799799
return b.currGset.Clone()
800800
}
801801

802-
advanceCurrentGtidSet := func(gtid string) error {
802+
advanceCurrentGtidSet := func(uuid uuid.UUID, gno int64, domainID uint32, serverID uint32, sequenceNumber uint64) (err error) {
803803
if b.currGset == nil {
804804
b.currGset = b.prevGset.Clone()
805805
}
806806
prev := b.currGset.Clone()
807-
err := b.currGset.Update(gtid)
807+
switch gset := b.currGset.(type) {
808+
case *MysqlGTIDSet:
809+
gset.AddGTID(uuid, gno)
810+
case *MariadbGTIDSet:
811+
err = gset.AddSet(&MariadbGTID{DomainID: domainID, ServerID: serverID, SequenceNumber: sequenceNumber})
812+
default:
813+
err = errors.Errorf("unsupported GTIDSet type %T", b.currGset)
814+
}
808815
if err == nil {
809816
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
810817
if !b.currGset.Equal(prev) {
@@ -824,7 +831,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
824831
break
825832
}
826833
u, _ := uuid.FromBytes(event.SID)
827-
err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO))
834+
err := advanceCurrentGtidSet(u, event.GNO, 0, 0, 0)
828835
if err != nil {
829836
return errors.Trace(err)
830837
}
@@ -833,7 +840,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
833840
break
834841
}
835842
GTID := event.GTID
836-
err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
843+
err := advanceCurrentGtidSet(uuid.Nil, 0, GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
837844
if err != nil {
838845
return errors.Trace(err)
839846
}

0 commit comments

Comments
 (0)