Skip to content

fixed logging issues #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ type TxProvider interface {

//TxOutbox abstracts the transactional outgoing channel type
type TxOutbox interface {
Logged
Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error
Start(amqpOut *AMQPOutbox) error
Stop() error
Expand Down
2 changes: 2 additions & 0 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
gb.Outbox.SetLogger(gb.Log())
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
Expand All @@ -107,6 +108,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
glue.SetLogger(gb.Log())
gb.Glue = glue
return gb
}
Expand Down
3 changes: 3 additions & 0 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gb
}))

exchange, routingKey := invocation.Routing()
instance.logger = imsm.Log()
err := instance.invoke(exchange, routingKey, sginv, message)
if err != nil {
span.LogFields(slog.Error(err))
Expand Down Expand Up @@ -321,6 +322,8 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider,
timeoutManager: timeoutManager,
}

logged := &gbus.Glogged{}
g.Glogged = logged
timeoutManager.SetTimeoutFunction(g.TimeoutSaga)
return g
}
18 changes: 13 additions & 5 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Instance struct {
ConcurrencyCtrl int
UnderlyingInstance gbus.Saga
MsgToMethodMap []*MsgToFuncPair
Log logrus.FieldLogger
logger logrus.FieldLogger
/*
Will hold the service name that sent the command or event that started the saga
*/
Expand All @@ -36,6 +36,14 @@ type Instance struct {
StartedByRPCID string
}

func (si *Instance) log() logrus.FieldLogger {
if si.logger == nil {
return logrus.WithField("id", si.ID)
}

return si.logger
}

func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error {

methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message)
Expand All @@ -55,10 +63,10 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati
params := make([]reflect.Value, 0)
params = append(params, reflect.ValueOf(invocation), valueOfMessage)
method := reflectedVal.MethodByName(methodName)
if invocation.Log() == nil {
if si.log() == nil {
panic("here")
}
invocation.Log().WithFields(logrus.Fields{
si.log().WithFields(logrus.Fields{
"method_name": methodName, "saga_id": si.ID,
}).Info("invoking method on saga")

Expand All @@ -78,13 +86,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati
return val.Interface().(error)
}
return nil
}, methodName, message.PayloadFQN, invocation.Log())
}, methodName, message.PayloadFQN, si.log())

if err != nil {
return err
}

invocation.Log().WithFields(logrus.Fields{
si.log().WithFields(logrus.Fields{
"method_name": methodName, "saga_id": si.ID,
}).Info("saga instance invoked")
}
Expand Down
4 changes: 3 additions & 1 deletion gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (

//TxOutbox is a mysql based transactional outbox
type TxOutbox struct {
*gbus.Glogged
svcName string
txProv gbus.TxProvider
purgeOnStartup bool
Expand All @@ -44,7 +45,7 @@ type TxOutbox struct {
}

func (outbox *TxOutbox) log() *log.Entry {
return log.WithField("tx", "mysql")
return outbox.Log().WithField("tx", "mysql")
}

//Start starts the transactional outbox that is used to send messages in sync with domain object change
Expand Down Expand Up @@ -128,6 +129,7 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO
ack: make(chan uint64, 1000000),
nack: make(chan uint64, 1000000),
exit: make(chan bool)}
txo.Glogged = &gbus.Glogged{}
return txo
}

Expand Down