Skip to content

Commit a4d75d6

Browse files
committed
allow edit logger in binlogsyncer and canal
1 parent 33ea963 commit a4d75d6

File tree

6 files changed

+63
-47
lines changed

6 files changed

+63
-47
lines changed

canal/canal.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/go-mysql-org/go-mysql/schema"
2020
"github.com/pingcap/errors"
2121
"github.com/pingcap/parser"
22-
"github.com/siddontang/go-log/log"
2322
)
2423

2524
// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
@@ -222,14 +221,14 @@ func (c *Canal) run() error {
222221
close(c.dumpDoneCh)
223222

224223
if err != nil {
225-
log.Errorf("canal dump mysql err: %v", err)
224+
c.cfg.Logger.Errorf("canal dump mysql err: %v", err)
226225
return errors.Trace(err)
227226
}
228227
}
229228

230229
if err := c.runSyncBinlog(); err != nil {
231230
if errors.Cause(err) != context.Canceled {
232-
log.Errorf("canal start sync binlog err: %v", err)
231+
c.cfg.Logger.Errorf("canal start sync binlog err: %v", err)
233232
return errors.Trace(err)
234233
}
235234
}
@@ -238,7 +237,7 @@ func (c *Canal) run() error {
238237
}
239238

240239
func (c *Canal) Close() {
241-
log.Infof("closing canal")
240+
c.cfg.Logger.Infof("closing canal")
242241
c.m.Lock()
243242
defer c.m.Unlock()
244243

@@ -353,7 +352,7 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
353352
c.errorTablesGetTime[key] = time.Now()
354353
c.tableLock.Unlock()
355354
// log error and return ErrMissingTableMeta
356-
log.Errorf("canal get table meta err: %v", errors.Trace(err))
355+
c.cfg.Logger.Errorf("canal get table meta err: %v", errors.Trace(err))
357356
return nil, schema.ErrMissingTableMeta
358357
}
359358
return nil, err
@@ -427,6 +426,7 @@ func (c *Canal) prepareSyncer() error {
427426
DisableRetrySync: c.cfg.DisableRetrySync,
428427
TimestampStringLocation: c.cfg.TimestampStringLocation,
429428
TLSConfig: c.cfg.TLSConfig,
429+
Logger: c.cfg.Logger,
430430
}
431431

432432
if strings.Contains(c.cfg.Addr, "/") {

canal/config.go

+8
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"crypto/tls"
55
"io/ioutil"
66
"math/rand"
7+
"os"
78
"time"
89

910
"github.com/BurntSushi/toml"
1011
"github.com/go-mysql-org/go-mysql/mysql"
1112
"github.com/pingcap/errors"
13+
"github.com/siddontang/go-log/log"
1214
)
1315

1416
type DumpConfig struct {
@@ -86,6 +88,9 @@ type Config struct {
8688

8789
// Set TLS config
8890
TLSConfig *tls.Config
91+
92+
//Set Logger
93+
Logger *log.Logger
8994
}
9095

9196
func NewConfigWithFile(name string) (*Config, error) {
@@ -124,5 +129,8 @@ func NewDefaultConfig() *Config {
124129
c.Dump.DiscardErr = true
125130
c.Dump.SkipMasterData = false
126131

132+
streamHandler, _ := log.NewStreamHandler(os.Stdout)
133+
c.Logger = log.NewDefault(streamHandler)
134+
127135
return c
128136
}

canal/dump.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/go-mysql-org/go-mysql/schema"
1212
"github.com/pingcap/errors"
1313
"github.com/shopspring/decimal"
14-
"github.com/siddontang/go-log/log"
1514
)
1615

1716
type dumpParseHandler struct {
@@ -49,7 +48,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
4948
e == schema.ErrMissingTableMeta {
5049
return nil
5150
}
52-
log.Errorf("get %s.%s information err: %v", db, table, err)
51+
h.c.cfg.Logger.Errorf("get %s.%s information err: %v", db, table, err)
5352
return errors.Trace(err)
5453
}
5554

@@ -163,13 +162,13 @@ func (c *Canal) dump() error {
163162
if err != nil {
164163
return errors.Trace(err)
165164
}
166-
log.Infof("skip master data, get current binlog position %v", pos)
165+
c.cfg.Logger.Infof("skip master data, get current binlog position %v", pos)
167166
h.name = pos.Name
168167
h.pos = uint64(pos.Pos)
169168
}
170169

171170
start := time.Now()
172-
log.Info("try dump MySQL and parse")
171+
c.cfg.Logger.Info("try dump MySQL and parse")
173172
if err := c.dumper.DumpAndParse(h); err != nil {
174173
return errors.Trace(err)
175174
}
@@ -185,7 +184,7 @@ func (c *Canal) dump() error {
185184
c.master.UpdateGTIDSet(h.gset)
186185
startPos = h.gset
187186
}
188-
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
187+
c.cfg.Logger.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
189188
time.Since(start).Seconds(), startPos)
190189
return nil
191190
}
@@ -196,12 +195,12 @@ func (c *Canal) tryDump() error {
196195
if (len(pos.Name) > 0 && pos.Pos > 0) ||
197196
(gset != nil && gset.String() != "") {
198197
// we will sync with binlog name and position
199-
log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
198+
c.cfg.Logger.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
200199
return nil
201200
}
202201

203202
if c.dumper == nil {
204-
log.Info("skip dump, no mysqldump")
203+
c.cfg.Logger.Info("skip dump, no mysqldump")
205204
return nil
206205
}
207206

canal/master.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,28 @@ type masterInfo struct {
1515
gset mysql.GTIDSet
1616

1717
timestamp uint32
18+
19+
logger *log.Logger
1820
}
1921

2022
func (m *masterInfo) Update(pos mysql.Position) {
21-
log.Debugf("update master position %s", pos)
23+
m.logger.Debugf("update master position %s", pos)
2224

2325
m.Lock()
2426
m.pos = pos
2527
m.Unlock()
2628
}
2729

2830
func (m *masterInfo) UpdateTimestamp(ts uint32) {
29-
log.Debugf("update master timestamp %d", ts)
31+
m.logger.Debugf("update master timestamp %d", ts)
3032

3133
m.Lock()
3234
m.timestamp = ts
3335
m.Unlock()
3436
}
3537

3638
func (m *masterInfo) UpdateGTIDSet(gset mysql.GTIDSet) {
37-
log.Debugf("update master gtid set %s", gset)
39+
m.logger.Debugf("update master gtid set %s", gset)
3840

3941
m.Lock()
4042
m.gset = gset

canal/sync.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/google/uuid"
1212
"github.com/pingcap/errors"
1313
"github.com/pingcap/parser/ast"
14-
"github.com/siddontang/go-log/log"
1514
)
1615

1716
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
@@ -22,15 +21,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
2221
if err != nil {
2322
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
2423
}
25-
log.Infof("start sync binlog at binlog file %v", pos)
24+
c.cfg.Logger.Infof("start sync binlog at binlog file %v", pos)
2625
return s, nil
2726
} else {
2827
gsetClone := gset.Clone()
2928
s, err := c.syncer.StartSyncGTID(gset)
3029
if err != nil {
3130
return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err)
3231
}
33-
log.Infof("start sync binlog at GTID set %v", gsetClone)
32+
c.cfg.Logger.Infof("start sync binlog at GTID set %v", gsetClone)
3433
return s, nil
3534
}
3635
}
@@ -65,7 +64,7 @@ func (c *Canal) runSyncBinlog() error {
6564
switch e := ev.Event.(type) {
6665
case *replication.RotateEvent:
6766
fakeRotateLogName = string(e.NextLogName)
68-
log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
67+
c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName)
6968
}
7069

7170
continue
@@ -76,6 +75,7 @@ func (c *Canal) runSyncBinlog() error {
7675
pos := c.master.Position()
7776

7877
curPos := pos.Pos
78+
7979
// next binlog pos
8080
pos.Pos = ev.Header.LogPos
8181

@@ -92,7 +92,7 @@ func (c *Canal) runSyncBinlog() error {
9292
case *replication.RotateEvent:
9393
pos.Name = string(e.NextLogName)
9494
pos.Pos = uint32(e.Position)
95-
log.Infof("rotate binlog to %s", pos)
95+
c.cfg.Logger.Infof("rotate binlog to %s", pos)
9696
savePos = true
9797
force = true
9898
if err = c.eventHandler.OnRotate(e); err != nil {
@@ -107,7 +107,7 @@ func (c *Canal) runSyncBinlog() error {
107107
if e != ErrExcludedTable &&
108108
e != schema.ErrTableNotExist &&
109109
e != schema.ErrMissingTableMeta {
110-
log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
110+
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
111111
return errors.Trace(err)
112112
}
113113
}
@@ -142,7 +142,7 @@ func (c *Canal) runSyncBinlog() error {
142142
case *replication.QueryEvent:
143143
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
144144
if err != nil {
145-
log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
145+
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
146146
continue
147147
}
148148
for _, stmt := range stmts {
@@ -230,7 +230,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {
230230

231231
func (c *Canal) updateTable(db, table string) (err error) {
232232
c.ClearTableCache([]byte(db), []byte(table))
233-
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
233+
c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
234234
if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
235235
return errors.Trace(err)
236236
}
@@ -291,7 +291,7 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
291291
if curPos.Compare(pos) >= 0 {
292292
return nil
293293
} else {
294-
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
294+
c.cfg.Logger.Debugf("master pos is %v, wait catching %v", curPos, pos)
295295
time.Sleep(100 * time.Millisecond)
296296
}
297297
}

0 commit comments

Comments
 (0)