Skip to content

Commit 3626f66

Browse files
committed
spring-projectsGH-2195: Fix No Seek Retry Mode
- Revert to seeking when `CommitFailedException` - due to a rebalance - For batch listeners, move committing to the try block - For record listeners, `AckMode.RECORD` already commits in the try block - With `AckMode.BATCH`, detect a no-seek retry and call the error handler `handleRemaining` **cherry-pick to 2.9.x**
1 parent a07ace9 commit 3626f66

5 files changed

+775
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+37-7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.regex.Pattern;
4545
import java.util.stream.Collectors;
4646

47+
import org.apache.kafka.clients.consumer.CommitFailedException;
4748
import org.apache.kafka.clients.consumer.Consumer;
4849
import org.apache.kafka.clients.consumer.ConsumerConfig;
4950
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -1309,9 +1310,7 @@ private void initAssignedPartitions() {
13091310
}
13101311

13111312
protected void pollAndInvoke() {
1312-
if (!this.autoCommit && !this.isRecordAck) {
1313-
processCommits();
1314-
}
1313+
doProcessCommits();
13151314
fixTxOffsetsIfNeeded();
13161315
idleBetweenPollIfNecessary();
13171316
if (this.seeks.size() > 0) {
@@ -1346,6 +1345,27 @@ protected void pollAndInvoke() {
13461345
}
13471346
}
13481347

1348+
private void doProcessCommits() {
1349+
if (!this.autoCommit && !this.isRecordAck) {
1350+
try {
1351+
processCommits();
1352+
}
1353+
catch (CommitFailedException cfe) {
1354+
if (this.pendingRecordsAfterError != null && !this.isBatchListener) {
1355+
ConsumerRecords<K, V> pending = this.pendingRecordsAfterError;
1356+
this.pendingRecordsAfterError = null;
1357+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
1358+
Iterator<ConsumerRecord<K, V>> iterator = pending.iterator();
1359+
while (iterator.hasNext()) {
1360+
records.add(iterator.next());
1361+
}
1362+
this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
1363+
KafkaMessageListenerContainer.this.thisOrParentContainer);
1364+
}
1365+
}
1366+
}
1367+
}
1368+
13491369
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
13501370
if (records != null && records.count() > 0) {
13511371
this.receivedSome = true;
@@ -2118,6 +2138,9 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
21182138
}
21192139
getAfterRollbackProcessor().clearThreadState();
21202140
}
2141+
if (!this.autoCommit && !this.isRecordAck) {
2142+
processCommits();
2143+
}
21212144
}
21222145
catch (RuntimeException e) {
21232146
failureTimer(sample);
@@ -2307,7 +2330,9 @@ private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
23072330
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
23082331
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException rte) {
23092332

2310-
if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null) {
2333+
if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null
2334+
|| rte instanceof CommitFailedException) {
2335+
23112336
this.commonErrorHandler.handleBatch(rte, records, this.consumer,
23122337
KafkaMessageListenerContainer.this.thisOrParentContainer,
23132338
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
@@ -2680,9 +2705,14 @@ record = this.recordInterceptor.intercept(record, this.consumer);
26802705
private void invokeErrorHandler(final ConsumerRecord<K, V> record,
26812706
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
26822707

2683-
if (this.commonErrorHandler.seeksAfterHandling()) {
2684-
if (this.producer == null) {
2685-
processCommits();
2708+
if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
2709+
try {
2710+
if (this.producer == null) {
2711+
processCommits();
2712+
}
2713+
}
2714+
catch (Exception ex) { // NO SONAR
2715+
this.logger.error(ex, "Failed to commit before handling error");
26862716
}
26872717
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
26882718
records.add(record);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyMap;
2122
import static org.mockito.ArgumentMatchers.isNull;
2223
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.BDDMockito.willAnswer;
@@ -97,9 +98,10 @@ public class DefaultErrorHandlerNoSeeksBatchListenerTests {
9798
*/
9899
@SuppressWarnings("unchecked")
99100
@Test
100-
void retriesWithNoSeeksAckModeBatch() throws Exception {
101+
void retriesWithNoSeeksBatchListener() throws Exception {
101102
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
102103
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
104+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
103105
this.registry.stop();
104106
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
105107
InOrder inOrder = inOrder(this.consumer, this.producer);
@@ -110,11 +112,11 @@ void retriesWithNoSeeksAckModeBatch() throws Exception {
110112
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
111113
inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60));
112114
inOrder.verify(this.consumer).pause(any());
113-
inOrder.verify(this.consumer).resume(any());
114115
offsets = new LinkedHashMap<>();
115116
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
116117
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
117118
inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60));
119+
inOrder.verify(this.consumer).resume(any());
118120
assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class);
119121
assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID);
120122
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "qux", "qux", "fiz", "buz");
@@ -130,6 +132,8 @@ public static class Config {
130132

131133
final CountDownLatch closeLatch = new CountDownLatch(1);
132134

135+
final CountDownLatch commitLatch = new CountDownLatch(2);
136+
133137
final AtomicBoolean fail = new AtomicBoolean(true);
134138

135139
final List<String> contents = new ArrayList<>();
@@ -199,6 +203,10 @@ public Consumer consumer() {
199203
return new ConsumerRecords(Collections.emptyMap());
200204
}
201205
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
206+
willAnswer(i -> {
207+
this.commitLatch.countDown();
208+
return null;
209+
}).given(consumer).commitSync(anyMap(), any());
202210
willAnswer(i -> {
203211
this.closeLatch.countDown();
204212
return null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyLong;
22+
import static org.mockito.ArgumentMatchers.anyMap;
23+
import static org.mockito.BDDMockito.given;
24+
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.inOrder;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.spy;
28+
import static org.mockito.Mockito.times;
29+
import static org.mockito.Mockito.verify;
30+
31+
import java.time.Duration;
32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.Collection;
35+
import java.util.Collections;
36+
import java.util.HashSet;
37+
import java.util.LinkedHashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Optional;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
46+
import org.apache.kafka.clients.consumer.CommitFailedException;
47+
import org.apache.kafka.clients.consumer.Consumer;
48+
import org.apache.kafka.clients.consumer.ConsumerRecord;
49+
import org.apache.kafka.clients.consumer.ConsumerRecords;
50+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
51+
import org.apache.kafka.common.TopicPartition;
52+
import org.apache.kafka.common.header.internals.RecordHeaders;
53+
import org.apache.kafka.common.record.TimestampType;
54+
import org.junit.jupiter.api.Test;
55+
import org.mockito.InOrder;
56+
57+
import org.springframework.beans.factory.annotation.Autowired;
58+
import org.springframework.context.annotation.Bean;
59+
import org.springframework.context.annotation.Configuration;
60+
import org.springframework.kafka.annotation.EnableKafka;
61+
import org.springframework.kafka.annotation.KafkaListener;
62+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
63+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
64+
import org.springframework.kafka.core.ConsumerFactory;
65+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
66+
import org.springframework.kafka.support.KafkaHeaders;
67+
import org.springframework.kafka.test.utils.KafkaTestUtils;
68+
import org.springframework.messaging.handler.annotation.Header;
69+
import org.springframework.test.annotation.DirtiesContext;
70+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
71+
72+
/**
73+
* @author Gary Russell
74+
* @since 2.9
75+
*
76+
*/
77+
@SpringJUnitConfig
78+
@DirtiesContext
79+
public class DefaultErrorHandlerSeekAfterCommitException {
80+
81+
@SuppressWarnings("rawtypes")
82+
@Autowired
83+
private Consumer consumer;
84+
85+
@Autowired
86+
private Config config;
87+
88+
@Autowired
89+
private KafkaListenerEndpointRegistry registry;
90+
91+
/*
92+
* Fail with commit exception - always seek.
93+
*/
94+
@SuppressWarnings("unchecked")
95+
@Test
96+
public void forceSeeksWithCommitException() throws Exception {
97+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
98+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
99+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
100+
this.registry.stop();
101+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
102+
InOrder inOrder = inOrder(this.consumer, this.config.eh);
103+
inOrder.verify(this.consumer).assign(any(Collection.class));
104+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
105+
inOrder.verify(this.consumer).commitSync(
106+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
107+
Duration.ofSeconds(60));
108+
inOrder.verify(this.config.eh).handleRemaining(any(), any(), any(), any());
109+
verify(this.consumer, times(3)).seek(any(), anyLong());
110+
}
111+
112+
@Configuration
113+
@EnableKafka
114+
public static class Config {
115+
116+
final List<String> contents = new ArrayList<>();
117+
118+
final List<Integer> deliveries = new ArrayList<>();
119+
120+
final CountDownLatch pollLatch = new CountDownLatch(4);
121+
122+
final CountDownLatch deliveryLatch = new CountDownLatch(1);
123+
124+
final CountDownLatch closeLatch = new CountDownLatch(1);
125+
126+
final CountDownLatch commitLatch = new CountDownLatch(1);
127+
128+
DefaultErrorHandler eh;
129+
130+
int count;
131+
132+
volatile org.apache.kafka.common.header.Header deliveryAttempt;
133+
134+
@KafkaListener(groupId = "grp",
135+
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
136+
partitions = "#{'0,1,2'.split(',')}"))
137+
public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
138+
this.contents.add(in);
139+
this.deliveries.add(delivery);
140+
this.deliveryLatch.countDown();
141+
if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times
142+
throw new RuntimeException("foo");
143+
}
144+
}
145+
146+
@SuppressWarnings({ "rawtypes" })
147+
@Bean
148+
public ConsumerFactory consumerFactory() {
149+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
150+
final Consumer consumer = consumer();
151+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
152+
.willReturn(consumer);
153+
return consumerFactory;
154+
}
155+
156+
@SuppressWarnings({ "rawtypes", "unchecked" })
157+
@Bean
158+
public Consumer consumer() {
159+
final Consumer consumer = mock(Consumer.class);
160+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
161+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
162+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
163+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
164+
records1.put(topicPartition0, Arrays.asList(
165+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
166+
new RecordHeaders(), Optional.empty()),
167+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
168+
new RecordHeaders(), Optional.empty())));
169+
records1.put(topicPartition1, Arrays.asList(
170+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
171+
new RecordHeaders(), Optional.empty()),
172+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
173+
new RecordHeaders(), Optional.empty())));
174+
records1.put(topicPartition2, Arrays.asList(
175+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
176+
new RecordHeaders(), Optional.empty()),
177+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
178+
new RecordHeaders(), Optional.empty())));
179+
final AtomicInteger which = new AtomicInteger();
180+
willAnswer(i -> {
181+
this.pollLatch.countDown();
182+
switch (which.getAndIncrement()) {
183+
case 0:
184+
return new ConsumerRecords(records1);
185+
default:
186+
try {
187+
Thread.sleep(50);
188+
}
189+
catch (InterruptedException e) {
190+
Thread.currentThread().interrupt();
191+
}
192+
return new ConsumerRecords(Collections.emptyMap());
193+
}
194+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
195+
List<TopicPartition> paused = new ArrayList<>();
196+
AtomicBoolean first = new AtomicBoolean(true);
197+
willAnswer(i -> {
198+
this.commitLatch.countDown();
199+
if (first.getAndSet(false)) {
200+
throw new CommitFailedException();
201+
}
202+
return null;
203+
}).given(consumer).commitSync(anyMap(), any());
204+
willAnswer(i -> {
205+
this.closeLatch.countDown();
206+
return null;
207+
}).given(consumer).close();
208+
willAnswer(i -> {
209+
paused.addAll(i.getArgument(0));
210+
return null;
211+
}).given(consumer).pause(any());
212+
willAnswer(i -> {
213+
return new HashSet<>(paused);
214+
}).given(consumer).paused();
215+
willAnswer(i -> {
216+
paused.removeAll(i.getArgument(0));
217+
return null;
218+
}).given(consumer).resume(any());
219+
return consumer;
220+
}
221+
222+
@SuppressWarnings({ "rawtypes", "unchecked" })
223+
@Bean
224+
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
225+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
226+
factory.setConsumerFactory(consumerFactory());
227+
factory.getContainerProperties().setAckMode(AckMode.RECORD);
228+
factory.getContainerProperties().setDeliveryAttemptHeader(true);
229+
factory.setRecordInterceptor((record, consumer) -> {
230+
Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
231+
return record;
232+
});
233+
this.eh = spy(new DefaultErrorHandler());
234+
this.eh.setSeekAfterError(false);
235+
factory.setCommonErrorHandler(eh);
236+
return factory;
237+
}
238+
239+
}
240+
241+
}

0 commit comments

Comments
 (0)