@@ -151,7 +151,7 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er
151
151
var decErr error
152
152
bm .Payload , decErr = worker .serializer .Decode (delivery .Body , bm .PayloadFQN )
153
153
if decErr != nil {
154
- worker .log ().WithError (decErr ).WithField ("message " , delivery ).Error ("failed to decode message. rejected as poison" )
154
+ worker .log ().WithError (decErr ).WithField ("fqn " , bm . PayloadFQN ).Error ("failed to decode message. rejected as poison" )
155
155
return nil , decErr
156
156
}
157
157
return bm , nil
@@ -337,13 +337,13 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
337
337
// each retry should run a new and separate transaction which should end with a commit or rollback
338
338
339
339
action := func (attempt uint ) (actionErr error ) {
340
-
340
+
341
341
tx , txCreateErr := worker .txProvider .New ()
342
- if txCreateErr != nil {
343
- worker .log ().WithError (txCreateErr ).Error ("failed creating new tx" )
344
- worker .span .LogFields (slog .Error (txCreateErr ))
345
- return txCreateErr
346
- }
342
+ if txCreateErr != nil {
343
+ worker .log ().WithError (txCreateErr ).Error ("failed creating new tx" )
344
+ worker .span .LogFields (slog .Error (txCreateErr ))
345
+ return txCreateErr
346
+ }
347
347
348
348
worker .span , sctx = opentracing .StartSpanFromContext (sctx , "invokeHandlers" )
349
349
worker .span .LogFields (slog .Uint64 ("attempt" , uint64 (attempt + 1 )))
@@ -353,9 +353,9 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
353
353
worker .log ().WithField ("stack" , pncMsg ).Error ("recovered from panic while invoking handler" )
354
354
actionErr = errors .New (pncMsg )
355
355
rbkErr := tx .Rollback ()
356
- if rbkErr != nil {
357
- worker .log ().WithError (rbkErr ).Error ("failed rolling back transaction when recovering from handler panic" )
358
- }
356
+ if rbkErr != nil {
357
+ worker .log ().WithError (rbkErr ).Error ("failed rolling back transaction when recovering from handler panic" )
358
+ }
359
359
worker .span .LogFields (slog .Error (actionErr ))
360
360
}
361
361
worker .span .Finish ()
@@ -392,17 +392,17 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
392
392
if handlerErr != nil {
393
393
hspan .LogFields (slog .Error (handlerErr ))
394
394
rbkErr := tx .Rollback ()
395
- if rbkErr != nil {
396
- worker .log ().WithError (rbkErr ).Error ("failed rolling back transaction when recovering from handler error" )
397
- }
395
+ if rbkErr != nil {
396
+ worker .log ().WithError (rbkErr ).Error ("failed rolling back transaction when recovering from handler error" )
397
+ }
398
398
hspan .Finish ()
399
399
return handlerErr
400
400
}
401
401
cmtErr := tx .Commit ()
402
- if cmtErr != nil {
403
- worker .log ().WithError (cmtErr ).Error ("failed committing transaction after invoking handlers" )
404
- return cmtErr
405
- }
402
+ if cmtErr != nil {
403
+ worker .log ().WithError (cmtErr ).Error ("failed committing transaction after invoking handlers" )
404
+ return cmtErr
405
+ }
406
406
return nil
407
407
}
408
408
0 commit comments