Skip to content

Allow logger override #699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/go-mysql-org/go-mysql/schema"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/siddontang/go-log/log"
)

// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
Expand Down Expand Up @@ -72,7 +71,7 @@ func NewCanal(cfg *Config) (*Canal, error) {
if c.cfg.DiscardNoMetaRowEvent {
c.errorTablesGetTime = make(map[string]time.Time)
}
c.master = &masterInfo{}
c.master = &masterInfo{logger: c.cfg.Logger}

c.delay = new(uint32)

Expand Down Expand Up @@ -222,14 +221,14 @@ func (c *Canal) run() error {
close(c.dumpDoneCh)

if err != nil {
log.Errorf("canal dump mysql err: %v", err)
c.cfg.Logger.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err)
}
}

if err := c.runSyncBinlog(); err != nil {
if errors.Cause(err) != context.Canceled {
log.Errorf("canal start sync binlog err: %v", err)
c.cfg.Logger.Errorf("canal start sync binlog err: %v", err)
return errors.Trace(err)
}
}
Expand All @@ -238,7 +237,7 @@ func (c *Canal) run() error {
}

func (c *Canal) Close() {
log.Infof("closing canal")
c.cfg.Logger.Infof("closing canal")
c.m.Lock()
defer c.m.Unlock()

Expand Down Expand Up @@ -353,7 +352,7 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
c.errorTablesGetTime[key] = time.Now()
c.tableLock.Unlock()
// log error and return ErrMissingTableMeta
log.Errorf("canal get table meta err: %v", errors.Trace(err))
c.cfg.Logger.Errorf("canal get table meta err: %v", errors.Trace(err))
return nil, schema.ErrMissingTableMeta
}
return nil, err
Expand Down Expand Up @@ -427,6 +426,7 @@ func (c *Canal) prepareSyncer() error {
DisableRetrySync: c.cfg.DisableRetrySync,
TimestampStringLocation: c.cfg.TimestampStringLocation,
TLSConfig: c.cfg.TLSConfig,
Logger: c.cfg.Logger,
}

if strings.Contains(c.cfg.Addr, "/") {
Expand Down
8 changes: 8 additions & 0 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"crypto/tls"
"io/ioutil"
"math/rand"
"os"
"time"

"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
"github.com/siddontang/go-log/log"
)

type DumpConfig struct {
Expand Down Expand Up @@ -86,6 +88,9 @@ type Config struct {

// Set TLS config
TLSConfig *tls.Config

//Set Logger
Logger *log.Logger
}

func NewConfigWithFile(name string) (*Config, error) {
Expand Down Expand Up @@ -124,5 +129,8 @@ func NewDefaultConfig() *Config {
c.Dump.DiscardErr = true
c.Dump.SkipMasterData = false

streamHandler, _ := log.NewStreamHandler(os.Stdout)
c.Logger = log.NewDefault(streamHandler)

return c
}
13 changes: 6 additions & 7 deletions canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/go-mysql-org/go-mysql/schema"
"github.com/pingcap/errors"
"github.com/shopspring/decimal"
"github.com/siddontang/go-log/log"
)

type dumpParseHandler struct {
Expand Down Expand Up @@ -49,7 +48,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
e == schema.ErrMissingTableMeta {
return nil
}
log.Errorf("get %s.%s information err: %v", db, table, err)
h.c.cfg.Logger.Errorf("get %s.%s information err: %v", db, table, err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -163,13 +162,13 @@ func (c *Canal) dump() error {
if err != nil {
return errors.Trace(err)
}
log.Infof("skip master data, get current binlog position %v", pos)
c.cfg.Logger.Infof("skip master data, get current binlog position %v", pos)
h.name = pos.Name
h.pos = uint64(pos.Pos)
}

start := time.Now()
log.Info("try dump MySQL and parse")
c.cfg.Logger.Info("try dump MySQL and parse")
if err := c.dumper.DumpAndParse(h); err != nil {
return errors.Trace(err)
}
Expand All @@ -185,7 +184,7 @@ func (c *Canal) dump() error {
c.master.UpdateGTIDSet(h.gset)
startPos = h.gset
}
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
c.cfg.Logger.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
time.Since(start).Seconds(), startPos)
return nil
}
Expand All @@ -196,12 +195,12 @@ func (c *Canal) tryDump() error {
if (len(pos.Name) > 0 && pos.Pos > 0) ||
(gset != nil && gset.String() != "") {
// we will sync with binlog name and position
log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
c.cfg.Logger.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
return nil
}

if c.dumper == nil {
log.Info("skip dump, no mysqldump")
c.cfg.Logger.Info("skip dump, no mysqldump")
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions canal/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@ type masterInfo struct {
gset mysql.GTIDSet

timestamp uint32

logger *log.Logger
}

func (m *masterInfo) Update(pos mysql.Position) {
log.Debugf("update master position %s", pos)
m.logger.Debugf("update master position %s", pos)

m.Lock()
m.pos = pos
m.Unlock()
}

func (m *masterInfo) UpdateTimestamp(ts uint32) {
log.Debugf("update master timestamp %d", ts)
m.logger.Debugf("update master timestamp %d", ts)

m.Lock()
m.timestamp = ts
m.Unlock()
}

func (m *masterInfo) UpdateGTIDSet(gset mysql.GTIDSet) {
log.Debugf("update master gtid set %s", gset)
m.logger.Debugf("update master gtid set %s", gset)

m.Lock()
m.gset = gset
Expand Down
18 changes: 9 additions & 9 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/siddontang/go-log/log"
)

func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
Expand All @@ -22,15 +21,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
if err != nil {
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
}
log.Infof("start sync binlog at binlog file %v", pos)
c.cfg.Logger.Infof("start sync binlog at binlog file %v", pos)
return s, nil
} else {
gsetClone := gset.Clone()
s, err := c.syncer.StartSyncGTID(gset)
if err != nil {
return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err)
}
log.Infof("start sync binlog at GTID set %v", gsetClone)
c.cfg.Logger.Infof("start sync binlog at GTID set %v", gsetClone)
return s, nil
}
}
Expand Down Expand Up @@ -65,7 +64,7 @@ func (c *Canal) runSyncBinlog() error {
switch e := ev.Event.(type) {
case *replication.RotateEvent:
fakeRotateLogName = string(e.NextLogName)
log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName)
}

continue
Expand All @@ -76,6 +75,7 @@ func (c *Canal) runSyncBinlog() error {
pos := c.master.Position()

curPos := pos.Pos

// next binlog pos
pos.Pos = ev.Header.LogPos

Expand All @@ -92,7 +92,7 @@ func (c *Canal) runSyncBinlog() error {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
log.Infof("rotate binlog to %s", pos)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(e); err != nil {
Expand All @@ -107,7 +107,7 @@ func (c *Canal) runSyncBinlog() error {
if e != ErrExcludedTable &&
e != schema.ErrTableNotExist &&
e != schema.ErrMissingTableMeta {
log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not replace this with a call to c.cfg.Logger.Errorf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can if you want!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should log errors in the library, just return them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems like the consistent thing to do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure I will change

c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (c *Canal) runSyncBinlog() error {
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
for _, stmt := range stmts {
Expand Down Expand Up @@ -230,7 +230,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {

func (c *Canal) updateTable(db, table string) (err error) {
c.ClearTableCache([]byte(db), []byte(table))
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
return errors.Trace(err)
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
if curPos.Compare(pos) >= 0 {
return nil
} else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
c.cfg.Logger.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond)
}
}
Expand Down
Loading