diff --git a/gbus/worker.go b/gbus/worker.go index 5bb83f1..a88726a 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -189,6 +189,9 @@ func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error { worker.log().WithError(err).Error("could not reject the message") worker.span.LogFields(slog.Error(err)) } + if !requeue { + metrics.ReportRejectedMessage() + } worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected") return err } @@ -286,6 +289,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } worker.span.LogFields(slog.String("panic", "failed to process message")) logEntry.Error("failed to process message") + _ = worker.reject(false, delivery) } }() @@ -335,7 +339,6 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { _ = worker.ack(delivery) } else { _ = worker.reject(false, delivery) - metrics.ReportRejectedMessage() } } diff --git a/tests/bus_test.go b/tests/bus_test.go index dfd201c..824880f 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -228,7 +228,10 @@ func TestRPC(t *testing.T) { } func TestDeadlettering(t *testing.T) { - + rejectedMessages, err := metrics.GetRejectedMessagesValue() + if err != nil { + t.Error("failed to get rejected messages value") + } proceed := make(chan bool) poison := gbus.NewBusMessage(PoisonMessage{}) service1 := createNamedBusForTest(testSvc1) @@ -256,7 +259,7 @@ func TestDeadlettering(t *testing.T) { <-proceed count, _ := metrics.GetRejectedMessagesValue() - if count != 1 { + if count != rejectedMessages+1 { t.Error("Should have one rejected message") } @@ -343,6 +346,57 @@ func TestReturnDeadToQueue(t *testing.T) { } } +func TestDeadLetterHandlerPanic(t *testing.T) { + proceed := make(chan bool) + rejectedMessages, err := metrics.GetRejectedMessagesValue() + if err != nil { + t.Error("failed to get rejected messages value") + } + + poison := gbus.NewBusMessage(Command1{}) + service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true, + gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) + + deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true, + gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) + visited := false + deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error { + if !visited { + visited = true + panic("PANIC DEAD HANDLER aaahhh!!!!!!") + } + proceed <- true + return nil + } + + faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + return errors.New("fail") + } + + deadletterSvc.HandleDeadletter(deadMessageHandler) + err = service1.HandleMessage(Command1{}, faultyHandler) + if err != nil { + t.Error("failed to register faultyhandler") + } + + deadletterSvc.Start() + defer deadletterSvc.Shutdown() + service1.Start() + defer service1.Shutdown() + + service1.Send(context.Background(), testSvc1, poison) + select { + case <-proceed: + count, _ := metrics.GetRejectedMessagesValue() + if count != rejectedMessages+2 { + t.Error("Should have 2 rejected messages") + } + case <-time.After(2 * time.Second): + t.Fatal("timeout, dlq failed to reject message after handler panicked") + } + +} + func TestRegistrationAfterBusStarts(t *testing.T) { event := Event1{} b := createBusForTest()