Skip to content

Commit 2c5c88c

Browse files
committed
GH-2622: Fix Possible NPE
MLC can be null. Other Sonar fixes.
1 parent 343f558 commit 2c5c88c

File tree

4 files changed

+57
-21
lines changed

4 files changed

+57
-21
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
*/
3030
public class ContainerPausingBackOffHandler implements BackOffHandler {
3131

32+
private final DefaultBackOffHandler defaultBackOffHandler = new DefaultBackOffHandler();
33+
3234
private final ListenerContainerPauseService pauser;
3335

3436
/**
@@ -41,7 +43,12 @@ public ContainerPausingBackOffHandler(ListenerContainerPauseService pauser) {
4143

4244
@Override
4345
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
44-
this.pauser.pause(container, Duration.ofMillis(nextBackOff));
46+
if (container == null) {
47+
this.defaultBackOffHandler.onNextBackOff(container, exception, nextBackOff);
48+
}
49+
else {
50+
this.pauser.pause(container, Duration.ofMillis(nextBackOff));
51+
}
4552
}
4653

4754
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
21+
/**
22+
* Default {@link BackOffHandler}; suspends the thread for the back off. If a container is
23+
* provided, {@link ListenerUtils#stoppableSleep(MessageListenerContainer, long)} is used,
24+
* to terminate the suspension if the container is stopped.
25+
*
26+
* @author Jan Marincek
27+
* @author Gary Russell
28+
* @since 2.9
29+
*/
30+
public class DefaultBackOffHandler implements BackOffHandler {
31+
32+
@Override
33+
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
34+
try {
35+
if (container == null) {
36+
Thread.sleep(nextBackOff);
37+
}
38+
else {
39+
ListenerUtils.stoppableSleep(container, nextBackOff);
40+
}
41+
}
42+
catch (InterruptedException e) {
43+
Thread.currentThread().interrupt();
44+
}
45+
}
46+
47+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

-17
Original file line numberDiff line numberDiff line change
@@ -307,21 +307,4 @@ void setLastException(Exception lastException) {
307307

308308
}
309309

310-
static class DefaultBackOffHandler implements BackOffHandler {
311-
@Override
312-
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
313-
try {
314-
if (container == null) {
315-
Thread.sleep(nextBackOff);
316-
}
317-
else {
318-
ListenerUtils.stoppableSleep(container, nextBackOff);
319-
}
320-
}
321-
catch (InterruptedException e) {
322-
throw new RuntimeException(e);
323-
}
324-
}
325-
}
326-
327310
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.commons.logging.LogFactory;
2525

2626
import org.springframework.core.log.LogAccessor;
27-
import org.springframework.lang.NonNull;
2827
import org.springframework.lang.Nullable;
2928
import org.springframework.scheduling.TaskScheduler;
3029
import org.springframework.util.Assert;
@@ -93,7 +92,7 @@ public void pause(MessageListenerContainer messageListenerContainer, Duration pa
9392
* Resume the listener container by given id.
9493
* @param listenerId the id of the listener
9594
*/
96-
public void resume(@NonNull String listenerId) {
95+
public void resume(String listenerId) {
9796
Assert.notNull(this.registry, "Resume by id is only supported when a registry is provided");
9897
getListenerContainer(listenerId).ifPresent(this::resume);
9998
}
@@ -102,7 +101,7 @@ public void resume(@NonNull String listenerId) {
102101
* Resume the listener container.
103102
* @param messageListenerContainer the listener container
104103
*/
105-
public void resume(@NonNull MessageListenerContainer messageListenerContainer) {
104+
public void resume(MessageListenerContainer messageListenerContainer) {
106105
if (messageListenerContainer.isPauseRequested()) {
107106
LOGGER.debug(() -> "Resuming container " + messageListenerContainer);
108107
messageListenerContainer.resume();

0 commit comments

Comments
 (0)