Skip to content

Commit f3aae64

Browse files
committed
Use separate logger for PulsarListener exceptions
This creates a separate logger for DefaultPulsarMessageListenerContainer that it uses to log exceptions thrown from listener callback methods. The exceptions are still logged at debug level in order to not change behavior in a patch release. However, the log category used by the logger can then be set to debug level but not spam the logs with the other debug statements in the listener container. Also, adds exception logging to the batch listener invocation using the same listener error logger as the record listener invocation. Resolves #1008
1 parent 147844b commit f3aae64

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess
111111

112112
private final Condition pausedCondition = this.lockOnPause.newCondition();
113113

114+
private final LogAccessor listenerErrorLogger = new LogAccessor(
115+
"%s-ListenerErrors".formatted(DefaultPulsarMessageListenerContainer.class.getName()));
116+
114117
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
115118
PulsarContainerProperties pulsarContainerProperties) {
116119
super(pulsarConsumerFactory, pulsarContainerProperties);
@@ -629,7 +632,7 @@ else if (this.listener != null) {
629632
inRetryMode.compareAndSet(true, false);
630633
}
631634
catch (RuntimeException e) {
632-
DefaultPulsarMessageListenerContainer.this.logger.debug(e,
635+
DefaultPulsarMessageListenerContainer.this.listenerErrorLogger.debug(e,
633636
() -> "Error dispatching the message to the listener.");
634637
if (this.pulsarConsumerErrorHandler != null) {
635638
invokeRecordListenerErrorHandler(inRetryMode, message, e, txn);
@@ -642,9 +645,8 @@ else if (this.ackMode.equals(AckMode.BATCH)) {
642645
this.nackableMessages.add(message.getMessageId());
643646
}
644647
else {
645-
throw new IllegalStateException("Exception occurred and message %s was not auto-nacked; "
646-
+ "switch to AckMode BATCH or RECORD to enable auto-nacks"
647-
.formatted(message.getMessageId()),
648+
throw new IllegalStateException("Exception occurred and message %".formatted(message.getMessageId())
649+
+ "was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks",
648650
e);
649651
}
650652
}
@@ -713,6 +715,8 @@ private List<Message<T>> doInvokeBatchListener(Messages<T> messages, List<Messag
713715
return Collections.emptyList();
714716
}
715717
catch (RuntimeException ex) {
718+
DefaultPulsarMessageListenerContainer.this.listenerErrorLogger.debug(ex,
719+
() -> "Error dispatching the messages to the batch listener.");
716720
if (this.pulsarConsumerErrorHandler != null) {
717721
return invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, messageList, ex, txn);
718722
}

0 commit comments

Comments
 (0)