Skip to content

Commit 3f645b1

Browse files
authored
GH-2558: Fix Possible NPE in FailedBatchProcessor
Resolves #2558 If a user throws a `BatchListenerExcecutionFailedException` with an invalid `index`, an NPE is logged instead of the true error. **cherry-pick to 2.9.x**
1 parent 9c22868 commit 3f645b1

File tree

2 files changed

+129
-4
lines changed

2 files changed

+129
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -155,9 +155,17 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
155155
ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
156156
int index = record != null ? findIndex(data, record) : batchListenerFailedException.getIndex();
157157
if (index < 0 || index >= data.count()) {
158-
this.logger.warn(batchListenerFailedException, () ->
159-
String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
160-
record.topic(), record.partition(), record.offset()));
158+
this.logger.warn(batchListenerFailedException, () -> {
159+
if (record != null) {
160+
return String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
161+
record.topic(), record.partition(), record.offset());
162+
}
163+
else {
164+
return String.format("Record not found in batch, index %d out of bounds (0, %d); "
165+
+ "re-seeking batch", index, data.count() - 1);
166+
167+
}
168+
});
161169
fallback(thrownException, data, consumer, container, invokeListener);
162170
}
163171
else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.willThrow;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.spy;
25+
import static org.mockito.Mockito.verify;
26+
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.function.BiConsumer;
30+
import java.util.function.Supplier;
31+
32+
import org.apache.commons.logging.LogFactory;
33+
import org.apache.kafka.clients.consumer.Consumer;
34+
import org.apache.kafka.clients.consumer.ConsumerRecord;
35+
import org.apache.kafka.clients.consumer.ConsumerRecords;
36+
import org.apache.kafka.common.TopicPartition;
37+
import org.junit.jupiter.api.Test;
38+
import org.mockito.ArgumentCaptor;
39+
40+
import org.springframework.core.log.LogAccessor;
41+
import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper;
42+
import org.springframework.util.backoff.BackOff;
43+
import org.springframework.util.backoff.FixedBackOff;
44+
45+
/**
46+
* @author Gary Russell
47+
* @since 3.0.3
48+
*
49+
*/
50+
public class FailedBatchProcessorTests {
51+
52+
@SuppressWarnings({ "rawtypes", "unchecked" })
53+
@Test
54+
void indexOutOfBounds() {
55+
class TestFBP extends FailedBatchProcessor {
56+
57+
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
58+
CommonErrorHandler fallbackHandler) {
59+
60+
super(recoverer, backOff, fallbackHandler);
61+
}
62+
63+
}
64+
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
65+
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
66+
67+
TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(0L, 0L), mockEH);
68+
LogAccessor logger = spy(new LogAccessor(LogFactory.getLog("test")));
69+
new DirectFieldAccessFallbackBeanWrapper(testFBP).setPropertyValue("logger", logger);
70+
71+
72+
ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0),
73+
List.of(mock(ConsumerRecord.class), mock(ConsumerRecord.class))));
74+
assertThatIllegalStateException().isThrownBy(() -> testFBP.handle(new BatchListenerFailedException("test", 3),
75+
records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnable.class)))
76+
.withMessage("fallback");
77+
ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class);
78+
verify(logger).warn(any(BatchListenerFailedException.class), captor.capture());
79+
String output = captor.getValue().get();
80+
assertThat(output).contains("Record not found in batch, index 3 out of bounds (0, 1);");
81+
}
82+
83+
@SuppressWarnings({ "rawtypes", "unchecked" })
84+
@Test
85+
void recordNotPresent() {
86+
class TestFBP extends FailedBatchProcessor {
87+
88+
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
89+
CommonErrorHandler fallbackHandler) {
90+
91+
super(recoverer, backOff, fallbackHandler);
92+
}
93+
94+
}
95+
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
96+
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
97+
98+
TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(0L, 0L), mockEH);
99+
LogAccessor logger = spy(new LogAccessor(LogFactory.getLog("test")));
100+
new DirectFieldAccessFallbackBeanWrapper(testFBP).setPropertyValue("logger", logger);
101+
102+
103+
ConsumerRecord rec1 = new ConsumerRecord("topic", 0, 0L, null, null);
104+
ConsumerRecord rec2 = new ConsumerRecord("topic", 0, 1L, null, null);
105+
ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0), List.of(rec1, rec2)));
106+
ConsumerRecord unknownRecord = new ConsumerRecord("topic", 42, 123L, null, null);
107+
assertThatIllegalStateException().isThrownBy(() ->
108+
testFBP.handle(new BatchListenerFailedException("topic", unknownRecord),
109+
records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnable.class)))
110+
.withMessage("fallback");
111+
ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class);
112+
verify(logger).warn(any(BatchListenerFailedException.class), captor.capture());
113+
String output = captor.getValue().get();
114+
assertThat(output).contains("Record not found in batch: topic-42@123;");
115+
}
116+
117+
}

0 commit comments

Comments
 (0)