Skip to content

Commit 612eb70

Browse files
author
Nikolay Mikheev
authored
Remove sharing of the retrying field in FallbackBatchErrorHandler (#2383)
* Remove sharing of the retrying field in FallbackBatchErrorHandler * Fix checkstyle
1 parent b42a125 commit 612eb70

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
5656

5757
private boolean ackAfterHandle = true;
5858

59-
private boolean retrying;
59+
private final ThreadLocal<Boolean> retrying = ThreadLocal.withInitial(() -> false);
6060

6161
/**
6262
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
@@ -103,14 +103,18 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
103103
this.logger.error(thrownException, "Called with no records; consumer exception");
104104
return;
105105
}
106-
this.retrying = true;
107-
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
108-
this.seeker, this.recoverer, this.logger, getLogLevel());
109-
this.retrying = false;
106+
this.retrying.set(true);
107+
try {
108+
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
109+
this.seeker, this.recoverer, this.logger, getLogLevel());
110+
}
111+
finally {
112+
this.retrying.set(false);
113+
}
110114
}
111115

112116
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
113-
if (this.retrying) {
117+
if (this.retrying.get()) {
114118
consumer.pause(consumer.assignment());
115119
}
116120
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

+36
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
1920
import static org.assertj.core.api.Assertions.assertThat;
2021
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2123
import static org.mockito.ArgumentMatchers.any;
2224
import static org.mockito.BDDMockito.given;
2325
import static org.mockito.BDDMockito.willAnswer;
26+
import static org.mockito.BDDMockito.willThrow;
2427
import static org.mockito.Mockito.inOrder;
2528
import static org.mockito.Mockito.mock;
2629
import static org.mockito.Mockito.times;
2730
import static org.mockito.Mockito.verify;
2831
import static org.mockito.Mockito.verifyNoMoreInteractions;
2932

33+
import java.lang.reflect.Field;
3034
import java.util.ArrayList;
3135
import java.util.Collections;
3236
import java.util.HashMap;
@@ -42,6 +46,7 @@
4246
import org.mockito.InOrder;
4347

4448
import org.springframework.kafka.KafkaException;
49+
import org.springframework.util.ReflectionUtils;
4550
import org.springframework.util.backoff.FixedBackOff;
4651

4752
/**
@@ -202,4 +207,35 @@ void rePauseOnRebalance() {
202207
verifyNoMoreInteractions(consumer);
203208
}
204209

210+
@Test
211+
void resetRetryingFlagOnExceptionFromRetryBatch() {
212+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (consumerRecord, e) -> { });
213+
214+
Consumer<?, ?> consumer = mock(Consumer.class);
215+
// KafkaException could be thrown from SeekToCurrentBatchErrorHandler, but it is hard to mock
216+
KafkaException exception = new KafkaException("Failed consumer.resume()");
217+
willThrow(exception).given(consumer).resume(any());
218+
219+
MessageListenerContainer container = mock(MessageListenerContainer.class);
220+
given(container.isRunning()).willReturn(true);
221+
222+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
223+
map.put(new TopicPartition("foo", 0),
224+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
225+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
226+
227+
assertThatThrownBy(() -> eh.handle(new RuntimeException(), records, consumer, container, () -> { }))
228+
.isSameAs(exception);
229+
230+
assertThat(getRetryingFieldValue(eh))
231+
.withFailMessage("retrying field was not reset to false")
232+
.isFalse();
233+
}
234+
235+
private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) {
236+
Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying");
237+
ReflectionUtils.makeAccessible(field);
238+
ThreadLocal<Boolean> value = (ThreadLocal<Boolean>) ReflectionUtils.getField(field, errorHandler);
239+
return value.get();
240+
}
205241
}

0 commit comments

Comments
 (0)