Skip to content

Commit 00177c9

Browse files
adiweissGuy Baron
adiweiss
authored and
Guy Baron
committed
dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105)
* dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks
1 parent f617e04 commit 00177c9

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

gbus/worker.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -235,22 +235,30 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
235235
if txCreateErr != nil {
236236
worker.log().WithError(txCreateErr).Error("failed creating new tx")
237237
worker.span.LogFields(slog.Error(txCreateErr))
238-
_ = worker.ack(delivery)
238+
_ = worker.reject(true, delivery)
239239
return
240240
}
241-
var fn func() error
242241
err := worker.deadletterHandler(tx, delivery)
242+
var reject bool
243243
if err != nil {
244244
worker.log().WithError(err).Error("failed handling deadletter")
245245
worker.span.LogFields(slog.Error(err))
246-
fn = tx.Rollback
246+
err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount)
247+
reject = true
247248
} else {
248-
fn = tx.Commit
249+
err = worker.SafeWithRetries(tx.Commit, MaxRetryCount)
249250
}
250-
err = worker.SafeWithRetries(fn, MaxRetryCount)
251+
251252
if err != nil {
252253
worker.log().WithError(err).Error("Rollback/Commit deadletter handler message")
253254
worker.span.LogFields(slog.Error(err))
255+
reject = true
256+
}
257+
258+
if reject {
259+
_ = worker.reject(true, delivery)
260+
} else {
261+
_ = worker.ack(delivery)
254262
}
255263
}
256264

0 commit comments

Comments
 (0)