Skip to content

Commit edfbd1d

Browse files
garyrussellartembilan
authored andcommitted
GH-2340: Fix Retrying Batch Error Handling
Resolves #2340 The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler` pauses and resumes the consumer during retries, to allow it to poll the consumer to avoid a forced rebalance. However, if a normal rebalance occurs, for example if a new member joins, the error handler does not re-pause the consumer and silently consumes new records. Add a mechanism to always re-pause the consume when in this retry mode. **cherry-pick to 2.9.x, 2.8.x**
1 parent 9e123ef commit edfbd1d

File tree

7 files changed

+90
-0
lines changed

7 files changed

+90
-0
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

+11
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021

2122
import org.apache.commons.logging.LogFactory;
2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.TopicPartition;
2527

2628
import org.springframework.kafka.support.TopicPartitionOffset;
2729

@@ -212,4 +214,13 @@ default void setAckAfterHandle(boolean ack) {
212214
throw new UnsupportedOperationException("This error handler does not support setting this property");
213215
}
214216

217+
/**
218+
* Called when partitions are assigned.
219+
* @param consumer the consumer.
220+
* @param partitions the newly assigned partitions.
221+
* @since 2.8.8
222+
*/
223+
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
224+
}
225+
215226
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021

2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.clients.consumer.ConsumerRecord;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426
import org.apache.kafka.common.errors.SerializationException;
2527

2628
import org.springframework.kafka.support.KafkaUtils;
@@ -199,4 +201,9 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
199201
}
200202
}
201203

204+
@Override
205+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
206+
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
207+
}
208+
202209
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

+9
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.Collections;
2021
import java.util.List;
2122

2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.TopicPartition;
2527

2628
import org.springframework.kafka.support.TopicPartitionOffset;
2729
import org.springframework.util.Assert;
@@ -160,5 +162,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
160162
}
161163
}
162164

165+
@Override
166+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
167+
if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
168+
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
169+
}
170+
}
171+
163172
}
164173

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

+10
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
6969
this(recoverer, backOff, null, fallbackHandler);
7070
}
7171

72+
/**
73+
* Return the fallback batch error handler.
74+
* @return the handler.
75+
* @since 2.8.8
76+
*/
77+
protected CommonErrorHandler getFallbackBatchHandler() {
78+
return this.fallbackBatchHandler;
79+
}
80+
81+
7282
/**
7383
* Construct an instance with the provided properties.
7484
* @param recoverer the recoverer.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

+12
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.function.BiConsumer;
2021

2122
import org.apache.commons.logging.LogFactory;
2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426

2527
import org.springframework.core.log.LogAccessor;
2628
import org.springframework.lang.Nullable;
@@ -54,6 +56,8 @@ class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
5456

5557
private boolean ackAfterHandle = true;
5658

59+
private boolean retrying;
60+
5761
/**
5862
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
5963
* a 5 second back off).
@@ -99,8 +103,16 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
99103
this.logger.error(thrownException, "Called with no records; consumer exception");
100104
return;
101105
}
106+
this.retrying = true;
102107
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
103108
this.seeker, this.recoverer, this.logger, getLogLevel());
109+
this.retrying = false;
110+
}
111+
112+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
113+
if (this.retrying) {
114+
consumer.pause(consumer.assignment());
115+
}
104116
}
105117

106118
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+4
Original file line numberDiff line numberDiff line change
@@ -3464,6 +3464,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
34643464
ListenerConsumer.this.firstPoll = true;
34653465
ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
34663466
}
3467+
if (ListenerConsumer.this.commonErrorHandler != null) {
3468+
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
3469+
partitions);
3470+
}
34673471
}
34683472

34693473
private void repauseIfNeeded(Collection<TopicPartition> partitions) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+37
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.BDDMockito.given;
2323
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.inOrder;
2425
import static org.mockito.Mockito.mock;
2526
import static org.mockito.Mockito.times;
2627
import static org.mockito.Mockito.verify;
@@ -38,6 +39,7 @@
3839
import org.apache.kafka.clients.consumer.ConsumerRecords;
3940
import org.apache.kafka.common.TopicPartition;
4041
import org.junit.jupiter.api.Test;
42+
import org.mockito.InOrder;
4143

4244
import org.springframework.kafka.KafkaException;
4345
import org.springframework.util.backoff.FixedBackOff;
@@ -165,4 +167,39 @@ void exitOnContainerStop() {
165167
assertThat(this.invoked).isEqualTo(1);
166168
}
167169

170+
@Test
171+
void rePauseOnRebalance() {
172+
this.invoked = 0;
173+
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
174+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (cr, ex) -> {
175+
recovered.add(cr);
176+
});
177+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
178+
map.put(new TopicPartition("foo", 0),
179+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
180+
map.put(new TopicPartition("foo", 1),
181+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
182+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
183+
Consumer<?, ?> consumer = mock(Consumer.class);
184+
willAnswer(inv -> {
185+
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
186+
return records;
187+
}).given(consumer).poll(any());
188+
MessageListenerContainer container = mock(MessageListenerContainer.class);
189+
given(container.isRunning()).willReturn(true);
190+
eh.handle(new RuntimeException(), records, consumer, container, () -> {
191+
this.invoked++;
192+
throw new RuntimeException();
193+
});
194+
assertThat(this.invoked).isEqualTo(1);
195+
assertThat(recovered).hasSize(2);
196+
InOrder inOrder = inOrder(consumer);
197+
inOrder.verify(consumer).pause(any());
198+
inOrder.verify(consumer).poll(any());
199+
inOrder.verify(consumer).pause(any());
200+
inOrder.verify(consumer).resume(any());
201+
verify(consumer, times(3)).assignment();
202+
verifyNoMoreInteractions(consumer);
203+
}
204+
168205
}

0 commit comments

Comments
 (0)