Skip to content

replace github.com/siddontang/go-log with log/slog #993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ package main

import (
"github.com/go-mysql-org/go-mysql/canal"
"github.com/siddontang/go-log/log"
)

type MyEventHandler struct {
Expand Down Expand Up @@ -491,27 +490,10 @@ We pass all tests in https://github.com/bradfitz/go-sql-test using go-mysql driv

## Logging

Logging by default is send to stdout.

To disable logging completely:
```go
import "github.com/siddontang/go-log/log"
...
nullHandler, _ := log.NewNullHandler()
cfg.Logger = log.NewDefault(nullHandler)
```

To write logging to any [`io.Writer`](https://pkg.go.dev/io#Writer):
```go
import "github.com/siddontang/go-log/log"
...
w := ...
streamHandler, _ := log.NewStreamHandler(w)
cfg.Logger = log.NewDefault(streamHandler)
```

Or you can implement your own [`log.Handler`](https://pkg.go.dev/github.com/siddontang/go-log/log#Handler).
Logging uses [log/slog](https://pkg.go.dev/log/slog) and by default is sent to standard out.

For the old logging package `github.com/siddontang/go-log/log`, a converting package
`https://github.com/serprex/slog-siddontang` is available.
## How to migrate to this repo
To change the used package in your repo it's enough to add this `replace` directive to your `go.mod`:
```
Expand Down
13 changes: 6 additions & 7 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log/slog"
"net"
"os"
"regexp"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/go-mysql-org/go-mysql/utils"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser"
"github.com/siddontang/go-log/log"
)

// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
Expand Down Expand Up @@ -66,8 +66,7 @@ var (
func NewCanal(cfg *Config) (*Canal, error) {
c := new(Canal)
if cfg.Logger == nil {
streamHandler, _ := log.NewStreamHandler(os.Stdout)
cfg.Logger = log.NewDefault(streamHandler)
cfg.Logger = slog.Default()
}
if cfg.Dialer == nil {
dialer := &net.Dialer{}
Expand Down Expand Up @@ -243,14 +242,14 @@ func (c *Canal) run() error {
close(c.dumpDoneCh)

if err != nil {
c.cfg.Logger.Errorf("canal dump mysql err: %v", err)
c.cfg.Logger.Error("canal dump mysql err", slog.Any("error", err))
return errors.Trace(err)
}
}

if err := c.runSyncBinlog(); err != nil {
if errors.Cause(err) != context.Canceled {
c.cfg.Logger.Errorf("canal start sync binlog err: %v", err)
c.cfg.Logger.Error("canal start sync binlog err", slog.Any("error", err))
return errors.Trace(err)
}
}
Expand All @@ -259,7 +258,7 @@ func (c *Canal) run() error {
}

func (c *Canal) Close() {
c.cfg.Logger.Infof("closing canal")
c.cfg.Logger.Info("closing canal")
c.m.Lock()
defer c.m.Unlock()

Expand Down Expand Up @@ -379,7 +378,7 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
c.errorTablesGetTime[key] = utils.Now()
c.tableLock.Unlock()
// log error and return ErrMissingTableMeta
c.cfg.Logger.Errorf("canal get table meta err: %v", errors.Trace(err))
c.cfg.Logger.Error("canal get table meta err", slog.Any("error", errors.Trace(err)))
return nil, schema.ErrMissingTableMeta
}
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/pingcap/tidb/pkg/parser"
"github.com/siddontang/go-log/log"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

Expand Down Expand Up @@ -99,7 +98,7 @@ func (s *canalTestSuite) SetupSuite() {

s.execute("SET GLOBAL binlog_format = 'ROW'")

s.c.SetEventHandler(&testEventHandler{})
s.c.SetEventHandler(&testEventHandler{T: s.T()})
go func() {
set, _ := mysql.ParseGTIDSet("mysql", "")
err = s.c.StartFromGTID(set)
Expand All @@ -126,10 +125,11 @@ func (s *canalTestSuite) execute(query string, args ...interface{}) *mysql.Resul

type testEventHandler struct {
DummyEventHandler
T *testing.T
}

func (h *testEventHandler) OnRow(e *RowsEvent) error {
log.Infof("OnRow %s %v\n", e.Action, e.Rows)
h.T.Log("OnRow", e.Action, e.Rows)
umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32
if ok && (umi != umiA && umi != umiB && umi != umiC) {
return fmt.Errorf("invalid unsigned medium int %d", umi)
Expand Down
8 changes: 3 additions & 5 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package canal

import (
"crypto/tls"
"log/slog"
"math/rand"
"net"
"os"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-log/loggers"

"github.com/go-mysql-org/go-mysql/client"
"github.com/go-mysql-org/go-mysql/mysql"
Expand Down Expand Up @@ -101,7 +100,7 @@ type Config struct {
TLSConfig *tls.Config

// Set Logger
Logger loggers.Advanced
Logger *slog.Logger

// Set Dialer
Dialer client.Dialer
Expand Down Expand Up @@ -150,8 +149,7 @@ func NewDefaultConfig() *Config {
c.Dump.DiscardErr = true
c.Dump.SkipMasterData = false

streamHandler, _ := log.NewStreamHandler(os.Stdout)
c.Logger = log.NewDefault(streamHandler)
c.Logger = slog.Default()

dialer := &net.Dialer{}
c.Dialer = dialer.DialContext
Expand Down
10 changes: 5 additions & 5 deletions canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package canal
import (
"encoding/hex"
"fmt"
"log/slog"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
e == schema.ErrMissingTableMeta {
return nil
}
h.c.cfg.Logger.Errorf("get %s.%s information err: %v", db, table, err)
h.c.cfg.Logger.Error("error getting table information", slog.String("database", db), slog.String("table", table), slog.Any("error", err))
return errors.Trace(err)
}

Expand Down Expand Up @@ -163,7 +164,7 @@ func (c *Canal) dump() error {
if err != nil {
return errors.Trace(err)
}
c.cfg.Logger.Infof("skip master data, get current binlog position %v", pos)
c.cfg.Logger.Info("skip master data, get current binlog position", slog.Any("position", pos))
h.name = pos.Name
h.pos = uint64(pos.Pos)
}
Expand All @@ -185,8 +186,7 @@ func (c *Canal) dump() error {
c.master.UpdateGTIDSet(h.gset)
startPos = h.gset
}
c.cfg.Logger.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
time.Since(start).Seconds(), startPos)
c.cfg.Logger.Info("dump MySQL and parse OK", slog.Duration("use", time.Since(start)), slog.String("position", startPos.String()))
return nil
}

Expand All @@ -196,7 +196,7 @@ func (c *Canal) tryDump() error {
if (len(pos.Name) > 0 && pos.Pos > 0) ||
(gset != nil && gset.String() != "") {
// we will sync with binlog name and position
c.cfg.Logger.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
c.cfg.Logger.Info("skip dump, use last binlog replication position or GTID set", slog.String("file", pos.Name), slog.Uint64("position", uint64(pos.Pos)), slog.Any("GTID set", gset))
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions canal/master.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package canal

import (
"log/slog"
"sync"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/siddontang/go-log/loggers"
)

type masterInfo struct {
Expand All @@ -16,27 +16,27 @@ type masterInfo struct {

timestamp uint32

logger loggers.Advanced
logger *slog.Logger
}

func (m *masterInfo) Update(pos mysql.Position) {
m.logger.Debugf("update master position %s", pos)
m.logger.Debug("update master position", slog.Any("pos", pos))

m.Lock()
m.pos = pos
m.Unlock()
}

func (m *masterInfo) UpdateTimestamp(ts uint32) {
m.logger.Debugf("update master timestamp %d", ts)
m.logger.Debug("update master timestamp", slog.Int64("ts", int64(ts)))

m.Lock()
m.timestamp = ts
m.Unlock()
}

func (m *masterInfo) UpdateGTIDSet(gset mysql.GTIDSet) {
m.logger.Debugf("update master gtid set %s", gset)
m.logger.Debug("update master gtid set", slog.Any("gset", gset))

m.Lock()
m.gset = gset
Expand Down
23 changes: 12 additions & 11 deletions canal/sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package canal

import (
"log/slog"
"sync/atomic"
"time"

Expand All @@ -20,15 +21,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
if err != nil {
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
}
c.cfg.Logger.Infof("start sync binlog at binlog file %v", pos)
c.cfg.Logger.Info("start sync binlog at binlog file", slog.Any("pos", pos))
return s, nil
} else {
gsetClone := gset.Clone()
s, err := c.syncer.StartSyncGTID(gset)
if err != nil {
return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err)
}
c.cfg.Logger.Infof("start sync binlog at GTID set %v", gsetClone)
c.cfg.Logger.Info("start sync binlog at GTID set", slog.Any("gset", gsetClone))
return s, nil
}
}
Expand Down Expand Up @@ -57,7 +58,7 @@ func (c *Canal) runSyncBinlog() error {
// and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
if ev.Header.Timestamp == 0 {
fakeRotateLogName := string(e.NextLogName)
c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName)
c.cfg.Logger.Info("received fake rotate event", slog.String("nextLogName", string(e.NextLogName)))

if fakeRotateLogName != c.master.Position().Name {
c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real rotate event")
Expand Down Expand Up @@ -93,17 +94,16 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
c.cfg.Logger.Info("rotate binlog", slog.Any("pos", pos))
savePos = true
force = true
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
if err != nil {
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
if err := c.handleRowsEvent(ev); err != nil {
c.cfg.Logger.Error("handle rows event", slog.String("file", pos.Name), slog.Uint64("position", uint64(curPos)), slog.Any("error", err))
return errors.Trace(err)
}
return nil
Expand All @@ -113,7 +113,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
for _, subEvent := range ev.Events {
err = c.handleEvent(subEvent)
if err != nil {
c.cfg.Logger.Errorf("handle transaction payload subevent at (%s, %d) error %v", pos.Name, curPos, err)
c.cfg.Logger.Error("handle transaction payload subevent", slog.String("file", pos.Name), slog.Uint64("position", uint64(curPos)), slog.Any("error", err))
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
if err != nil {
// The parser does not understand all syntax.
// For example, it won't parse [CREATE|DROP] TRIGGER statements.
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
c.cfg.Logger.Error("error parsing query, will skip this event", slog.String("query", string(e.Query)), slog.Any("error", err))
return nil
}
if len(stmts) > 0 {
Expand Down Expand Up @@ -246,7 +246,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {

func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (err error) {
c.ClearTableCache([]byte(db), []byte(table))
c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
c.cfg.Logger.Info("table structure changed, clear table cache", slog.String("database", db), slog.String("table", table))
if err = c.eventHandler.OnTableChanged(header, db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
return errors.Trace(err)
}
Expand Down Expand Up @@ -316,7 +316,8 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
if curPos.Compare(pos) >= 0 {
return nil
} else {
c.cfg.Logger.Debugf("master pos is %v, wait catching %v", curPos, pos)
c.cfg.Logger.Debug("master pos is behind, wait to catch up", slog.String("master file", curPos.Name), slog.Uint64("master position", uint64(curPos.Pos)),
slog.String("target file", pos.Name), slog.Uint64("target position", uint64(curPos.Pos)))
time.Sleep(100 * time.Millisecond)
}
}
Expand Down
Loading
Loading