Skip to content

Commit e63fc7f

Browse files
committed
spring-projectsGH-2677: Fix EH AckAfterHandle With Async Acks
Resolves spring-projects#2677 When an error handler handles an error (and `ackAfterHandle` is true), the ack bypassed the out of order commit logic, causing the consumer to be paused indefinitely, due to the missing ack. **cherry-pick to 2.9.x**
1 parent 9619aba commit e63fc7f

File tree

2 files changed

+149
-1
lines changed

2 files changed

+149
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -2858,7 +2858,12 @@ private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> cRecord) {
28582858
}
28592859
if (this.remainingRecords == null
28602860
|| !cRecord.equals(this.remainingRecords.iterator().next())) {
2861-
ackCurrent(cRecord);
2861+
if (this.offsetsInThisBatch != null) { // NOSONAR (sync)
2862+
ackInOrder(cRecord);
2863+
}
2864+
else {
2865+
ackCurrent(cRecord);
2866+
}
28622867
}
28632868
if (this.isManualAck) {
28642869
this.commitRecovered = false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.apache.kafka.clients.consumer.Consumer;
26+
import org.apache.kafka.clients.consumer.ConsumerConfig;
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.producer.ProducerConfig;
29+
import org.junit.jupiter.api.Test;
30+
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.kafka.annotation.EnableKafka;
35+
import org.springframework.kafka.annotation.KafkaListener;
36+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
37+
import org.springframework.kafka.core.ConsumerFactory;
38+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
39+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
40+
import org.springframework.kafka.core.KafkaTemplate;
41+
import org.springframework.kafka.core.ProducerFactory;
42+
import org.springframework.kafka.support.Acknowledgment;
43+
import org.springframework.kafka.support.LogIfLevelEnabled;
44+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
45+
import org.springframework.kafka.test.context.EmbeddedKafka;
46+
import org.springframework.kafka.test.utils.KafkaTestUtils;
47+
import org.springframework.test.annotation.DirtiesContext;
48+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
49+
50+
/**
51+
* @author Gary Russell
52+
* @since 3.0
53+
*
54+
*/
55+
@SpringJUnitConfig
56+
@DirtiesContext
57+
@EmbeddedKafka(topics = "asaah")
58+
public class AsyncAckAfterHandleTests {
59+
60+
@Test
61+
void testAckAfterHandlerAsync(@Autowired
62+
Config config, @Autowired
63+
KafkaTemplate<Integer, String> template)
64+
throws InterruptedException {
65+
66+
for (int i = 0; i < 6; i++) {
67+
template.send("asaah", 0, null, "message contents");
68+
}
69+
assertThat(config.latch.await(10, TimeUnit.SECONDS))
70+
.describedAs("CountDownLatch.count=%d", config.latch.getCount())
71+
.isTrue();
72+
}
73+
74+
@Configuration
75+
@EnableKafka
76+
public static class Config {
77+
78+
private final CountDownLatch latch = new CountDownLatch(6);
79+
80+
@KafkaListener(id = "asaah.id", topics = "asaah")
81+
public void onTestTopic(final ConsumerRecord<byte[], byte[]> record,
82+
final Acknowledgment acknowledgment) {
83+
accept(record, acknowledgment);
84+
}
85+
86+
private void accept(final ConsumerRecord<byte[], byte[]> record,
87+
final Acknowledgment acknowledgment) {
88+
if (record.offset() == 1) {
89+
throw new RuntimeException("Exception for error handler");
90+
}
91+
else {
92+
this.latch.countDown();
93+
acknowledgment.acknowledge();
94+
}
95+
}
96+
97+
@Bean
98+
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
99+
ConsumerFactory<Integer, String> consumerFactory) {
100+
101+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
102+
factory.setConsumerFactory(consumerFactory);
103+
factory.setConcurrency(1);
104+
factory.setCommonErrorHandler(new MreErrorHandler());
105+
factory.getContainerProperties().setAsyncAcks(true);
106+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
107+
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.TRACE);
108+
return factory;
109+
}
110+
111+
@Bean
112+
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
113+
Map<String, Object> props = KafkaTestUtils.consumerProps("asaac.grp", "false", broker);
114+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
115+
return new DefaultKafkaConsumerFactory<>(
116+
props);
117+
}
118+
119+
@Bean
120+
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
121+
Map<String, Object> props = KafkaTestUtils.producerProps(broker);
122+
props.put(ProducerConfig.LINGER_MS_CONFIG, 100L);
123+
return new DefaultKafkaProducerFactory<>(props);
124+
}
125+
126+
@Bean
127+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
128+
return new KafkaTemplate<>(pf);
129+
}
130+
131+
public class MreErrorHandler implements CommonErrorHandler {
132+
133+
@Override
134+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
135+
MessageListenerContainer container) {
136+
Config.this.latch.countDown();
137+
return true;
138+
}
139+
}
140+
141+
}
142+
143+
}

0 commit comments

Comments
 (0)