Skip to content

Commit 6a28cc5

Browse files
Nikolay Mikheevgaryrussell
Nikolay Mikheev
authored andcommitted
GH-2382: Fix FBEH Cross Talk
Remove sharing of the retrying field in FallbackBatchErrorHandler (#2383) * Remove sharing of the retrying field in FallbackBatchErrorHandler * Fix checkstyle
1 parent 798dea3 commit 6a28cc5

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
5555

5656
private boolean ackAfterHandle = true;
5757

58-
private boolean retrying;
58+
private final ThreadLocal<Boolean> retrying = ThreadLocal.withInitial(() -> false);
5959

6060
/**
6161
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
@@ -102,14 +102,18 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
102102
this.logger.error(thrownException, "Called with no records; consumer exception");
103103
return;
104104
}
105-
this.retrying = true;
106-
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
107-
this.seeker, this.recoverer, this.logger, getLogLevel());
108-
this.retrying = false;
105+
this.retrying.set(true);
106+
try {
107+
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
108+
this.seeker, this.recoverer, this.logger, getLogLevel());
109+
}
110+
finally {
111+
this.retrying.set(false);
112+
}
109113
}
110114

111115
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
112-
if (this.retrying) {
116+
if (this.retrying.get()) {
113117
consumer.pause(consumer.assignment());
114118
}
115119
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+36
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
1920
import static org.assertj.core.api.Assertions.assertThat;
2021
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2123
import static org.mockito.ArgumentMatchers.any;
2224
import static org.mockito.BDDMockito.given;
2325
import static org.mockito.BDDMockito.willAnswer;
26+
import static org.mockito.BDDMockito.willThrow;
2427
import static org.mockito.Mockito.inOrder;
2528
import static org.mockito.Mockito.mock;
2629
import static org.mockito.Mockito.times;
2730
import static org.mockito.Mockito.verify;
2831
import static org.mockito.Mockito.verifyNoMoreInteractions;
2932

33+
import java.lang.reflect.Field;
3034
import java.util.ArrayList;
3135
import java.util.Collections;
3236
import java.util.HashMap;
@@ -42,6 +46,7 @@
4246
import org.mockito.InOrder;
4347

4448
import org.springframework.kafka.KafkaException;
49+
import org.springframework.util.ReflectionUtils;
4550
import org.springframework.util.backoff.FixedBackOff;
4651

4752
/**
@@ -205,4 +210,35 @@ void rePauseOnRebalance() {
205210
verifyNoMoreInteractions(consumer);
206211
}
207212

213+
@Test
214+
void resetRetryingFlagOnExceptionFromRetryBatch() {
215+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (consumerRecord, e) -> { });
216+
217+
Consumer<?, ?> consumer = mock(Consumer.class);
218+
// KafkaException could be thrown from SeekToCurrentBatchErrorHandler, but it is hard to mock
219+
KafkaException exception = new KafkaException("Failed consumer.resume()");
220+
willThrow(exception).given(consumer).resume(any());
221+
222+
MessageListenerContainer container = mock(MessageListenerContainer.class);
223+
given(container.isRunning()).willReturn(true);
224+
225+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
226+
map.put(new TopicPartition("foo", 0),
227+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
228+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
229+
230+
assertThatThrownBy(() -> eh.handle(new RuntimeException(), records, consumer, container, () -> { }))
231+
.isSameAs(exception);
232+
233+
assertThat(getRetryingFieldValue(eh))
234+
.withFailMessage("retrying field was not reset to false")
235+
.isFalse();
236+
}
237+
238+
private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) {
239+
Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying");
240+
ReflectionUtils.makeAccessible(field);
241+
ThreadLocal<Boolean> value = (ThreadLocal<Boolean>) ReflectionUtils.getField(field, errorHandler);
242+
return value.get();
243+
}
208244
}

0 commit comments

Comments
 (0)