diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2a54860a03..1848c93810 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -44,6 +44,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -1309,9 +1310,7 @@ private void initAssignedPartitions() { } protected void pollAndInvoke() { - if (!this.autoCommit && !this.isRecordAck) { - processCommits(); - } + doProcessCommits(); fixTxOffsetsIfNeeded(); idleBetweenPollIfNecessary(); if (this.seeks.size() > 0) { @@ -1346,6 +1345,27 @@ protected void pollAndInvoke() { } } + private void doProcessCommits() { + if (!this.autoCommit && !this.isRecordAck) { + try { + processCommits(); + } + catch (CommitFailedException cfe) { + if (this.pendingRecordsAfterError != null && !this.isBatchListener) { + ConsumerRecords pending = this.pendingRecordsAfterError; + this.pendingRecordsAfterError = null; + List> records = new ArrayList<>(); + Iterator> iterator = pending.iterator(); + while (iterator.hasNext()) { + records.add(iterator.next()); + } + this.commonErrorHandler.handleRemaining(cfe, records, this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer); + } + } + } + } + private void invokeIfHaveRecords(@Nullable ConsumerRecords records) { if (records != null && records.count() > 0) { this.receivedSome = true; @@ -2118,6 +2138,9 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor } getAfterRollbackProcessor().clearThreadState(); } + if (!this.autoCommit && !this.isRecordAck) { + processCommits(); + } } catch (RuntimeException e) { failureTimer(sample); @@ -2307,7 +2330,9 @@ private void doInvokeBatchOnMessage(final ConsumerRecords records, private void invokeBatchErrorHandler(final ConsumerRecords records, @Nullable List> list, RuntimeException rte) { - if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null) { + if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null + || rte instanceof CommitFailedException) { + this.commonErrorHandler.handleBatch(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, () -> invokeBatchOnMessageWithRecordsOrList(records, list)); @@ -2680,9 +2705,14 @@ record = this.recordInterceptor.intercept(record, this.consumer); private void invokeErrorHandler(final ConsumerRecord record, Iterator> iterator, RuntimeException rte) { - if (this.commonErrorHandler.seeksAfterHandling()) { - if (this.producer == null) { - processCommits(); + if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) { + try { + if (this.producer == null) { + processCommits(); + } + } + catch (Exception ex) { // NO SONAR + this.logger.error(ex, "Failed to commit before handling error"); } List> records = new ArrayList<>(); records.add(record); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java index 1f158fc2ce..836ce6762c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; @@ -97,9 +98,10 @@ public class DefaultErrorHandlerNoSeeksBatchListenerTests { */ @SuppressWarnings("unchecked") @Test - void retriesWithNoSeeksAckModeBatch() throws Exception { + void retriesWithNoSeeksBatchListener() throws Exception { assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); this.registry.stop(); assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); InOrder inOrder = inOrder(this.consumer, this.producer); @@ -110,11 +112,11 @@ void retriesWithNoSeeksAckModeBatch() throws Exception { offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)); inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60)); inOrder.verify(this.consumer).pause(any()); - inOrder.verify(this.consumer).resume(any()); offsets = new LinkedHashMap<>(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60)); + inOrder.verify(this.consumer).resume(any()); assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class); assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID); assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "qux", "qux", "fiz", "buz"); @@ -130,6 +132,8 @@ public static class Config { final CountDownLatch closeLatch = new CountDownLatch(1); + final CountDownLatch commitLatch = new CountDownLatch(2); + final AtomicBoolean fail = new AtomicBoolean(true); final List contents = new ArrayList<>(); @@ -199,6 +203,10 @@ public Consumer consumer() { return new ConsumerRecords(Collections.emptyMap()); } }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + this.commitLatch.countDown(); + return null; + }).given(consumer).commitSync(anyMap(), any()); willAnswer(i -> { this.closeLatch.countDown(); return null; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchAckTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchAckTests.java new file mode 100644 index 0000000000..84bf2c1cb6 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchAckTests.java @@ -0,0 +1,250 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class DefaultErrorHandlerSeekAfterCommitExceptionBatchAckTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Fail with commit exception - always seek. + */ + @SuppressWarnings("unchecked") + @Test + public void forceSeeksWithCommitException() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.handleLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer, this.config.eh); + inOrder.verify(this.consumer).assign(any(Collection.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + inOrder.verify(this.consumer).commitSync(any(), any()); + inOrder.verify(this.config.eh).handleRemaining(any(), any(), any(), any()); + verify(this.consumer, times(2)).seek(any(), anyLong()); + } + + @Configuration + @EnableKafka + public static class Config { + + final List contents = new ArrayList<>(); + + final List deliveries = new ArrayList<>(); + + final CountDownLatch pollLatch = new CountDownLatch(4); + + final CountDownLatch deliveryLatch = new CountDownLatch(1); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(1); + + final CountDownLatch handleLatch = new CountDownLatch(1); + + DefaultErrorHandler eh; + + int count; + + volatile org.apache.kafka.common.header.Header deliveryAttempt; + + @KafkaListener(groupId = "grp", + topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo", + partitions = "#{'0,1,2'.split(',')}")) + public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) { + this.contents.add(in); + this.deliveries.add(delivery); + this.deliveryLatch.countDown(); + if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times + throw new RuntimeException("foo"); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + List paused = new ArrayList<>(); + AtomicBoolean first = new AtomicBoolean(true); + willAnswer(i -> { + this.commitLatch.countDown(); + if (first.getAndSet(false)) { + throw new CommitFailedException(); + } + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + willAnswer(i -> { + paused.addAll(i.getArgument(0)); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + return new HashSet<>(paused); + }).given(consumer).paused(); + willAnswer(i -> { + paused.removeAll(i.getArgument(0)); + return null; + }).given(consumer).resume(any()); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(AckMode.BATCH); + factory.getContainerProperties().setDeliveryAttemptHeader(true); + factory.setRecordInterceptor((record, consumer) -> { + Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + return record; + }); + this.eh = spy(new DefaultErrorHandler()); + this.eh.setSeekAfterError(false); + willAnswer(inv -> { + try { + inv.callRealMethod(); + } + finally { + this.handleLatch.countDown(); + } + return null; + }).given(this.eh).handleRemaining(any(), any(), any(), any()); + factory.setCommonErrorHandler(eh); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchListenerTests.java new file mode 100644 index 0000000000..69f8624fbf --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchListenerTests.java @@ -0,0 +1,237 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +@SuppressWarnings("deprecation") +public class DefaultErrorHandlerSeekAfterCommitExceptionBatchListenerTests { + + private static final String CONTAINER_ID = "container"; + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @SuppressWarnings("rawtypes") + @Autowired + private Producer producer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @SuppressWarnings("unchecked") + @Test + void forceSeeksWithCommitException() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer, this.producer, this.config.eh); + inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + Map offsets = new LinkedHashMap<>(); + inOrder.verify(this.consumer).commitSync(any(), any()); + inOrder.verify(this.config.eh).handleBatch(any(), any(), any(), any(), any()); + } + + @Configuration + @EnableKafka + public static class Config { + + final CountDownLatch pollLatch = new CountDownLatch(2); + + final CountDownLatch deliveryLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(1); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final List contents = new ArrayList<>(); + + DefaultErrorHandler eh; + + volatile Exception ehException; + + @KafkaListener(id = CONTAINER_ID, topics = "foo") + public void foo(List in) { + this.deliveryLatch.countDown(); + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + willAnswer(i -> { + ((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned( + Collections.singletonList(topicPartition1)); + return null; + }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + AtomicBoolean first = new AtomicBoolean(true); + willAnswer(i -> { + this.commitLatch.countDown(); + if (first.getAndSet(false)) { + throw new CommitFailedException(); + } + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + this.eh = spy(new DefaultErrorHandler()); + this.eh.setSeekAfterError(false); + factory.setCommonErrorHandler(eh); + factory.getContainerProperties().setAssignmentCommitOption(AssignmentCommitOption.NEVER); + return factory; + } + + @SuppressWarnings("rawtypes") + @Bean + public ProducerFactory producerFactory() { + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.createProducer(isNull())).willReturn(producer()); + given(pf.transactionCapable()).willReturn(true); + return pf; + } + + @SuppressWarnings("rawtypes") + @Bean + public Producer producer() { + return mock(Producer.class); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionTests.java new file mode 100644 index 0000000000..26db166fe8 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionTests.java @@ -0,0 +1,241 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class DefaultErrorHandlerSeekAfterCommitExceptionTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Fail with commit exception - always seek. + */ + @SuppressWarnings("unchecked") + @Test + public void forceSeeksWithCommitException() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer, this.config.eh); + inOrder.verify(this.consumer).assign(any(Collection.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)), + Duration.ofSeconds(60)); + inOrder.verify(this.config.eh).handleRemaining(any(), any(), any(), any()); + verify(this.consumer, times(3)).seek(any(), anyLong()); + } + + @Configuration + @EnableKafka + public static class Config { + + final List contents = new ArrayList<>(); + + final List deliveries = new ArrayList<>(); + + final CountDownLatch pollLatch = new CountDownLatch(4); + + final CountDownLatch deliveryLatch = new CountDownLatch(1); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(1); + + DefaultErrorHandler eh; + + int count; + + volatile org.apache.kafka.common.header.Header deliveryAttempt; + + @KafkaListener(groupId = "grp", + topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo", + partitions = "#{'0,1,2'.split(',')}")) + public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) { + this.contents.add(in); + this.deliveries.add(delivery); + this.deliveryLatch.countDown(); + if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times + throw new RuntimeException("foo"); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + List paused = new ArrayList<>(); + AtomicBoolean first = new AtomicBoolean(true); + willAnswer(i -> { + this.commitLatch.countDown(); + if (first.getAndSet(false)) { + throw new CommitFailedException(); + } + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + willAnswer(i -> { + paused.addAll(i.getArgument(0)); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + return new HashSet<>(paused); + }).given(consumer).paused(); + willAnswer(i -> { + paused.removeAll(i.getArgument(0)); + return null; + }).given(consumer).resume(any()); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(AckMode.RECORD); + factory.getContainerProperties().setDeliveryAttemptHeader(true); + factory.setRecordInterceptor((record, consumer) -> { + Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + return record; + }); + this.eh = spy(new DefaultErrorHandler()); + this.eh.setSeekAfterError(false); + factory.setCommonErrorHandler(eh); + return factory; + } + + } + +}