Skip to content

Commit ad48491

Browse files
committed
GH-2195: Fix Remaining ConsumerRecords and Test
Use a `LinkedHashMap` for the remaining records so that the order is retained. Fix test after removing some logic in the last commit.
1 parent b2b41d9 commit ad48491

File tree

3 files changed

+3
-9
lines changed

3 files changed

+3
-9
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2776,7 +2776,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
27762776
else {
27772777
boolean handled = this.commonErrorHandler.handleOne(rte, record, this.consumer,
27782778
KafkaMessageListenerContainer.this.thisOrParentContainer);
2779-
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
2779+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
27802780
if (!handled) {
27812781
records.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
27822782
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(record);

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckNoResumeContainerPausedTests.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@
6060
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
6161
import org.springframework.kafka.core.ConsumerFactory;
6262
import org.springframework.kafka.listener.ContainerProperties.AckMode;
63-
import org.springframework.kafka.support.KafkaHeaders;
6463
import org.springframework.kafka.test.utils.KafkaTestUtils;
65-
import org.springframework.messaging.handler.annotation.Header;
6664
import org.springframework.test.annotation.DirtiesContext;
6765
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6866

@@ -110,7 +108,6 @@ public void doesNotResumeIfPartitionPaused() throws Exception {
110108
verify(this.consumer, never()).resume(any());
111109
assertThat(this.config.count).isEqualTo(4);
112110
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
113-
assertThat(this.config.deliveries).contains(1, 1, 1, 1);
114111
verify(this.consumer, never()).seek(any(), anyLong());
115112
}
116113

@@ -120,8 +117,6 @@ public static class Config {
120117

121118
final List<String> contents = new ArrayList<>();
122119

123-
final List<Integer> deliveries = new ArrayList<>();
124-
125120
final CountDownLatch pollLatch = new CountDownLatch(4);
126121

127122
final CountDownLatch deliveryLatch = new CountDownLatch(4);
@@ -135,9 +130,8 @@ public static class Config {
135130
@KafkaListener(id = "id", groupId = "grp",
136131
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
137132
partitions = "#{'0,1,2'.split(',')}"))
138-
public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
133+
public void foo(String in) {
139134
this.contents.add(in);
140-
this.deliveries.add(delivery);
141135
this.deliveryLatch.countDown();
142136
if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times
143137
throw new RuntimeException("foo");

Diff for: spring-kafka/src/test/resources/log4j2-test.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</Console>
77
</Appenders>
88
<Loggers>
9-
<Logger name="org.springframework.kafka" level="warn"/>
9+
<Logger name="org.springframework.kafka" level="debug"/>
1010
<Logger name="org.springframework.kafka.ReplyingKafkaTemplate" level="warn"/>
1111
<Logger name="org.springframework.kafka.retrytopic" level="warn"/>
1212
<Logger name="org.apache.kafka.clients" level="warn"/>

0 commit comments

Comments
 (0)