Skip to content

Commit e9b5e69

Browse files
garyrussellartembilan
authored andcommitted
GH-2195: Fix No Seek Retries with Non-Blocking
Related to #2195 Do not process the remaining records if the first record is from a paused partition.
1 parent 20cbc75 commit e9b5e69

File tree

2 files changed

+246
-2
lines changed

2 files changed

+246
-2
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1566,8 +1566,10 @@ private ConsumerRecords<K, V> doPoll() {
15661566
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
15671567
KafkaMessageListenerContainer.this.emergencyStop.run();
15681568
}
1569-
records = this.pendingRecordsAfterError;
1570-
this.pendingRecordsAfterError = null;
1569+
if (!isPartitionPaused(this.pendingRecordsAfterError.partitions().iterator().next())) {
1570+
records = this.pendingRecordsAfterError;
1571+
this.pendingRecordsAfterError = null;
1572+
}
15711573
}
15721574
captureOffsets(records);
15731575
checkRebalanceCommits();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyLong;
22+
import static org.mockito.ArgumentMatchers.anyMap;
23+
import static org.mockito.BDDMockito.given;
24+
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.inOrder;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
28+
import static org.mockito.Mockito.verify;
29+
30+
import java.time.Duration;
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.HashSet;
36+
import java.util.LinkedHashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Optional;
40+
import java.util.concurrent.CountDownLatch;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
44+
import org.apache.kafka.clients.consumer.Consumer;
45+
import org.apache.kafka.clients.consumer.ConsumerRecord;
46+
import org.apache.kafka.clients.consumer.ConsumerRecords;
47+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
48+
import org.apache.kafka.common.TopicPartition;
49+
import org.apache.kafka.common.header.internals.RecordHeaders;
50+
import org.apache.kafka.common.record.TimestampType;
51+
import org.junit.jupiter.api.Test;
52+
import org.mockito.InOrder;
53+
54+
import org.springframework.beans.factory.annotation.Autowired;
55+
import org.springframework.context.annotation.Bean;
56+
import org.springframework.context.annotation.Configuration;
57+
import org.springframework.kafka.annotation.EnableKafka;
58+
import org.springframework.kafka.annotation.KafkaListener;
59+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
60+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
61+
import org.springframework.kafka.core.ConsumerFactory;
62+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
63+
import org.springframework.kafka.support.KafkaHeaders;
64+
import org.springframework.kafka.test.utils.KafkaTestUtils;
65+
import org.springframework.messaging.handler.annotation.Header;
66+
import org.springframework.test.annotation.DirtiesContext;
67+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
68+
69+
/**
70+
* @author Gary Russell
71+
* @since 2.9
72+
*
73+
*/
74+
@SpringJUnitConfig
75+
@DirtiesContext
76+
public class DefaultErrorHandlerNoSeeksRecordAckNoResumeTests {
77+
78+
@SuppressWarnings("rawtypes")
79+
@Autowired
80+
private Consumer consumer;
81+
82+
@Autowired
83+
private Config config;
84+
85+
@Autowired
86+
private KafkaListenerEndpointRegistry registry;
87+
88+
@SuppressWarnings("unchecked")
89+
@Test
90+
public void doesNotResumeIfPartitionPaused() throws Exception {
91+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
92+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
93+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
94+
this.registry.stop();
95+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
96+
InOrder inOrder = inOrder(this.consumer);
97+
inOrder.verify(this.consumer).assign(any(Collection.class));
98+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
99+
inOrder.verify(this.consumer).commitSync(
100+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
101+
Duration.ofSeconds(60));
102+
inOrder.verify(this.consumer).commitSync(
103+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)),
104+
Duration.ofSeconds(60));
105+
inOrder.verify(this.consumer).commitSync(
106+
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)),
107+
Duration.ofSeconds(60));
108+
inOrder.verify(this.consumer).pause(any());
109+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
110+
verify(this.consumer, never()).resume(any());
111+
assertThat(this.config.count).isEqualTo(4);
112+
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
113+
assertThat(this.config.deliveries).contains(1, 1, 1, 1);
114+
assertThat(this.config.deliveryAttempt).isNotNull();
115+
verify(this.consumer, never()).seek(any(), anyLong());
116+
}
117+
118+
@Configuration
119+
@EnableKafka
120+
public static class Config {
121+
122+
final List<String> contents = new ArrayList<>();
123+
124+
final List<Integer> deliveries = new ArrayList<>();
125+
126+
final CountDownLatch pollLatch = new CountDownLatch(4);
127+
128+
final CountDownLatch deliveryLatch = new CountDownLatch(4);
129+
130+
final CountDownLatch closeLatch = new CountDownLatch(1);
131+
132+
final CountDownLatch commitLatch = new CountDownLatch(3);
133+
134+
int count;
135+
136+
volatile org.apache.kafka.common.header.Header deliveryAttempt;
137+
138+
@KafkaListener(id = "id", groupId = "grp",
139+
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
140+
partitions = "#{'0,1,2'.split(',')}"))
141+
public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
142+
this.contents.add(in);
143+
this.deliveries.add(delivery);
144+
this.deliveryLatch.countDown();
145+
if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times
146+
throw new RuntimeException("foo");
147+
}
148+
}
149+
150+
@SuppressWarnings({ "rawtypes" })
151+
@Bean
152+
public ConsumerFactory consumerFactory(KafkaListenerEndpointRegistry registry) {
153+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
154+
final Consumer consumer = consumer(registry);
155+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
156+
.willReturn(consumer);
157+
return consumerFactory;
158+
}
159+
160+
@SuppressWarnings({ "rawtypes", "unchecked" })
161+
@Bean
162+
public Consumer consumer(KafkaListenerEndpointRegistry registry) {
163+
final Consumer consumer = mock(Consumer.class);
164+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
165+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
166+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
167+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
168+
records1.put(topicPartition0, Arrays.asList(
169+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
170+
new RecordHeaders(), Optional.empty()),
171+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
172+
new RecordHeaders(), Optional.empty())));
173+
records1.put(topicPartition1, Arrays.asList(
174+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
175+
new RecordHeaders(), Optional.empty()),
176+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
177+
new RecordHeaders(), Optional.empty())));
178+
records1.put(topicPartition2, Arrays.asList(
179+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
180+
new RecordHeaders(), Optional.empty()),
181+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
182+
new RecordHeaders(), Optional.empty())));
183+
final AtomicInteger which = new AtomicInteger();
184+
willAnswer(i -> {
185+
this.pollLatch.countDown();
186+
switch (which.getAndIncrement()) {
187+
case 0:
188+
return new ConsumerRecords(records1);
189+
default:
190+
try {
191+
Thread.sleep(50);
192+
}
193+
catch (InterruptedException e) {
194+
Thread.currentThread().interrupt();
195+
}
196+
return new ConsumerRecords(Collections.emptyMap());
197+
}
198+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
199+
List<TopicPartition> paused = new ArrayList<>();
200+
willAnswer(i -> {
201+
this.commitLatch.countDown();
202+
registry.getListenerContainer("id").pausePartition(topicPartition1);
203+
return null;
204+
}).given(consumer).commitSync(anyMap(), any());
205+
willAnswer(i -> {
206+
this.closeLatch.countDown();
207+
return null;
208+
}).given(consumer).close();
209+
willAnswer(i -> {
210+
paused.addAll(i.getArgument(0));
211+
return null;
212+
}).given(consumer).pause(any());
213+
willAnswer(i -> {
214+
return new HashSet<>(paused);
215+
}).given(consumer).paused();
216+
willAnswer(i -> {
217+
paused.removeAll(i.getArgument(0));
218+
return null;
219+
}).given(consumer).resume(any());
220+
return consumer;
221+
}
222+
223+
@SuppressWarnings({ "rawtypes", "unchecked" })
224+
@Bean
225+
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListenerEndpointRegistry registry) {
226+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
227+
factory.setConsumerFactory(consumerFactory(registry));
228+
factory.getContainerProperties().setAckMode(AckMode.RECORD);
229+
factory.getContainerProperties().setDeliveryAttemptHeader(true);
230+
factory.setRecordInterceptor((record, consumer) -> {
231+
Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
232+
return record;
233+
});
234+
DefaultErrorHandler eh = new DefaultErrorHandler();
235+
eh.setSeekAfterError(false);
236+
factory.setCommonErrorHandler(eh);
237+
return factory;
238+
}
239+
240+
}
241+
242+
}

0 commit comments

Comments
 (0)