Skip to content

Commit c26f26e

Browse files
authored
spring-projectsGH-3166: BatchInterceptor issues with retries
Fixes: spring-projects#3166 * When retries are enabled, batch interceptor is not invoking the intercept methods for failures on retries and the possible eventual success method call. Addressing this issue. * Addressing PR review **Auto-cherry-pick to `3.1.x` & `3.0.x`**
1 parent 43df65f commit c26f26e

File tree

2 files changed

+93
-16
lines changed

2 files changed

+93
-16
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+19-15
Original file line numberDiff line numberDiff line change
@@ -2187,12 +2187,8 @@ private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V>
21872187
@Nullable
21882188
private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> records, // NOSONAR
21892189
List<ConsumerRecord<K, V>> recordList) {
2190-
2191-
Object sample = startMicrometerSample();
21922190
try {
21932191
invokeBatchOnMessage(records, recordList);
2194-
batchInterceptAfter(records, null);
2195-
successTimer(sample, null);
21962192
if (this.batchFailed) {
21972193
this.batchFailed = false;
21982194
if (this.commonErrorHandler != null) {
@@ -2205,9 +2201,6 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22052201
}
22062202
}
22072203
catch (RuntimeException e) {
2208-
this.batchFailed = true;
2209-
failureTimer(sample, null);
2210-
batchInterceptAfter(records, e);
22112204
if (this.commonErrorHandler == null) {
22122205
throw e;
22132206
}
@@ -2359,15 +2352,26 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
23592352
recordList = createRecordList(records);
23602353
}
23612354
}
2362-
if (this.wantsFullRecords) {
2363-
this.batchListener.onMessage(records, // NOSONAR
2364-
this.isAnyManualAck
2365-
? new ConsumerBatchAcknowledgment(records, recordList)
2366-
: null,
2367-
this.consumer);
2355+
Object sample = startMicrometerSample();
2356+
try {
2357+
if (this.wantsFullRecords) {
2358+
this.batchListener.onMessage(records, // NOSONAR
2359+
this.isAnyManualAck
2360+
? new ConsumerBatchAcknowledgment(records, recordList)
2361+
: null,
2362+
this.consumer);
2363+
}
2364+
else {
2365+
doInvokeBatchOnMessage(records, recordList); // NOSONAR
2366+
}
2367+
batchInterceptAfter(records, null);
2368+
successTimer(sample, null);
23682369
}
2369-
else {
2370-
doInvokeBatchOnMessage(records, recordList); // NOSONAR
2370+
catch (RuntimeException e) {
2371+
this.batchFailed = true;
2372+
failureTimer(sample, null);
2373+
batchInterceptAfter(records, e);
2374+
throw e;
23712375
}
23722376
}
23732377

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+74-1
Original file line numberDiff line numberDiff line change
@@ -4019,7 +4019,6 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
40194019
inOrder.verify(interceptor).setupThreadState(eq(consumer));
40204020
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
40214021
inOrder.verify(interceptor).intercept(any(), eq(consumer));
4022-
inOrder.verify(interceptor).success(any(), eq(consumer));
40234022
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
40244023
any(Duration.class));
40254024
container.stop();
@@ -4240,6 +4239,80 @@ public void clearThreadState(Consumer<?, ?> consumer) {
42404239
container.stop();
42414240
}
42424241

4242+
@Test
4243+
@SuppressWarnings("unchecked")
4244+
public void invokeBatchInterceptorSuccessFailureOnRetry() throws Exception {
4245+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
4246+
Consumer<Integer, String> consumer = mock(Consumer.class);
4247+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
4248+
ConsumerRecord<Integer, String> firstRecord = new ConsumerRecord<>("test-topic", 0, 0L, 1, "data-1");
4249+
ConsumerRecord<Integer, String> secondRecord = new ConsumerRecord<>("test-topic", 0, 1L, 1, "data-2");
4250+
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
4251+
records.put(new TopicPartition("test-topic", 0), List.of(firstRecord, secondRecord));
4252+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
4253+
AtomicInteger invocation = new AtomicInteger(0);
4254+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
4255+
if (invocation.getAndIncrement() == 0) {
4256+
return consumerRecords;
4257+
}
4258+
else {
4259+
// Subsequent polls after the first one returns empty records.
4260+
return new ConsumerRecords<Integer, String>(Map.of());
4261+
}
4262+
});
4263+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
4264+
new TopicPartitionOffset("test-topic", 0) };
4265+
4266+
CountDownLatch latch = new CountDownLatch(4); // 3 failures, 1 success
4267+
BatchMessageListener<Integer, String> batchMessageListener = spy(
4268+
new BatchMessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
4269+
4270+
@Override
4271+
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
4272+
latch.countDown();
4273+
if (latch.getCount() > 0) {
4274+
throw new IllegalArgumentException("Failed record");
4275+
}
4276+
}
4277+
4278+
});
4279+
4280+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
4281+
containerProps.setGroupId("grp");
4282+
containerProps.setAckMode(ContainerProperties.AckMode.BATCH);
4283+
containerProps.setMissingTopicsFatal(false);
4284+
containerProps.setMessageListener(batchMessageListener);
4285+
containerProps.setClientId("clientId");
4286+
4287+
BatchInterceptor<Integer, String> batchInterceptor = spy(new BatchInterceptor<Integer, String>() {
4288+
4289+
@Override
4290+
public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, String> records,
4291+
Consumer<Integer, String> consumer) {
4292+
return records;
4293+
}
4294+
4295+
});
4296+
4297+
KafkaMessageListenerContainer<Integer, String> container =
4298+
new KafkaMessageListenerContainer<>(cf, containerProps);
4299+
container.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0, 3)));
4300+
container.setBatchInterceptor(batchInterceptor);
4301+
container.start();
4302+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
4303+
4304+
InOrder inOrder = inOrder(batchInterceptor, batchMessageListener, consumer);
4305+
for (int i = 0; i < 3; i++) {
4306+
inOrder.verify(batchInterceptor).intercept(eq(consumerRecords), eq(consumer));
4307+
inOrder.verify(batchMessageListener).onMessage(eq(List.of(firstRecord, secondRecord)));
4308+
inOrder.verify(batchInterceptor).failure(eq(consumerRecords), any(), eq(consumer));
4309+
}
4310+
inOrder.verify(batchInterceptor).intercept(eq(consumerRecords), eq(consumer));
4311+
inOrder.verify(batchMessageListener).onMessage(eq(List.of(firstRecord, secondRecord)));
4312+
inOrder.verify(batchInterceptor).success(eq(consumerRecords), eq(consumer));
4313+
container.stop();
4314+
}
4315+
42434316
@Test
42444317
public void testOffsetAndMetadataWithoutProvider() throws InterruptedException {
42454318
testOffsetAndMetadata(null, new OffsetAndMetadata(1));

0 commit comments

Comments
 (0)