Skip to content

Commit 51db0b6

Browse files
garyrussellartembilan
authored andcommitted
GH-2372: Commit Offsets for Skipped Records
Resolves #2372
1 parent 083d0d8 commit 51db0b6

File tree

2 files changed

+159
-3
lines changed

2 files changed

+159
-3
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -2315,9 +2315,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
23152315
}
23162316
if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
23172317
if (this.nackSleepDurationMillis < 0) {
2318-
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
2319-
this.acks.put(record);
2320-
}
2318+
ackBatch(records);
23212319
}
23222320
if (this.producer != null) {
23232321
sendOffsetsToTransaction();
@@ -2332,6 +2330,12 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
23322330
}
23332331
}
23342332

2333+
private void ackBatch(final ConsumerRecords<K, V> records) throws InterruptedException {
2334+
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
2335+
this.acks.put(record);
2336+
}
2337+
}
2338+
23352339
private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> recordsArg,
23362340
@Nullable List<ConsumerRecord<K, V>> recordListArg) {
23372341

@@ -2571,6 +2575,12 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
25712575
if (next == null) {
25722576
this.logger.debug(() -> "BatchInterceptor returned null, skipping: "
25732577
+ nextArg + " with " + nextArg.count() + " records");
2578+
try {
2579+
ackBatch(nextArg);
2580+
}
2581+
catch (InterruptedException e) {
2582+
Thread.currentThread().interrupt();
2583+
}
25742584
}
25752585
}
25762586
return next;
@@ -2585,6 +2595,7 @@ record = this.earlyRecordInterceptor.intercept(record, this.consumer);
25852595
if (record == null) {
25862596
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
25872597
+ KafkaUtils.format(recordArg));
2598+
ackCurrent(recordArg);
25882599
}
25892600
}
25902601
return record;
@@ -2760,6 +2771,7 @@ record = this.recordInterceptor.intercept(record, this.consumer);
27602771
if (record == null) {
27612772
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
27622773
+ KafkaUtils.format(recordArg));
2774+
ackCurrent(recordArg);
27632775
}
27642776
else {
27652777
try {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+144
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.junit.jupiter.params.provider.Arguments;
8989
import org.junit.jupiter.params.provider.EnumSource;
9090
import org.junit.jupiter.params.provider.MethodSource;
91+
import org.junit.jupiter.params.provider.ValueSource;
9192
import org.mockito.ArgumentCaptor;
9293
import org.mockito.InOrder;
9394

@@ -126,6 +127,7 @@
126127
import org.springframework.kafka.test.utils.KafkaTestUtils;
127128
import org.springframework.lang.Nullable;
128129
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
130+
import org.springframework.transaction.PlatformTransactionManager;
129131
import org.springframework.util.backoff.FixedBackOff;
130132

131133
/**
@@ -3735,6 +3737,148 @@ public void clearThreadState(Consumer<?, ?> consumer) {
37353737
container.stop();
37363738
}
37373739

3740+
private static Stream<Arguments> paramsForRecordAllSkipped() {
3741+
return Stream.of(
3742+
Arguments.of(AckMode.RECORD, false),
3743+
Arguments.of(AckMode.RECORD, true),
3744+
Arguments.of(AckMode.BATCH, false),
3745+
Arguments.of(AckMode.BATCH, true));
3746+
}
3747+
3748+
@ParameterizedTest(name = "{index} AckMode.{0} early intercept {1}")
3749+
@MethodSource("paramsForRecordAllSkipped")
3750+
@SuppressWarnings({"unchecked", "deprecation"})
3751+
public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early) throws Exception {
3752+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3753+
Consumer<Integer, String> consumer = mock(Consumer.class);
3754+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3755+
ConsumerRecord<Integer, String> firstRecord = new ConsumerRecord<>("foo", 0, 0L, 1, "foo");
3756+
ConsumerRecord<Integer, String> secondRecord = new ConsumerRecord<>("foo", 0, 1L, 1, "bar");
3757+
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
3758+
records.put(new TopicPartition("foo", 0), List.of(firstRecord, secondRecord));
3759+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3760+
AtomicBoolean first = new AtomicBoolean(true);
3761+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3762+
Thread.sleep(50);
3763+
return first.getAndSet(false) ? consumerRecords : ConsumerRecords.empty();
3764+
});
3765+
CountDownLatch latch = new CountDownLatch(1);
3766+
willAnswer(inv -> {
3767+
latch.countDown();
3768+
return null;
3769+
}).given(consumer).commitSync(any(), any());
3770+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3771+
new TopicPartitionOffset("foo", 0) };
3772+
3773+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3774+
containerProps.setGroupId("grp");
3775+
containerProps.setAckMode(ackMode);
3776+
3777+
containerProps.setMessageListener((MessageListener) msg -> {
3778+
});
3779+
containerProps.setClientId("clientId");
3780+
3781+
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
3782+
3783+
@Override
3784+
@Nullable
3785+
public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
3786+
Consumer<Integer, String> consumer) {
3787+
3788+
return null;
3789+
}
3790+
3791+
});
3792+
3793+
KafkaMessageListenerContainer<Integer, String> container =
3794+
new KafkaMessageListenerContainer<>(cf, containerProps);
3795+
container.setRecordInterceptor(recordInterceptor);
3796+
container.setInterceptBeforeTx(early);
3797+
container.start();
3798+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3799+
3800+
InOrder inOrder = inOrder(recordInterceptor, consumer);
3801+
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
3802+
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
3803+
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
3804+
if (ackMode.equals(AckMode.RECORD)) {
3805+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
3806+
any(Duration.class));
3807+
}
3808+
else {
3809+
verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
3810+
any(Duration.class));
3811+
}
3812+
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
3813+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3814+
any(Duration.class));
3815+
container.stop();
3816+
}
3817+
3818+
@ParameterizedTest(name = "{index} early intercept {0}")
3819+
@ValueSource(booleans = { true, false })
3820+
@SuppressWarnings({"unchecked", "deprecation"})
3821+
public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception {
3822+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3823+
Consumer<Integer, String> consumer = mock(Consumer.class);
3824+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3825+
ConsumerRecord<Integer, String> firstRecord = new ConsumerRecord<>("foo", 0, 0L, 1, "foo");
3826+
ConsumerRecord<Integer, String> secondRecord = new ConsumerRecord<>("foo", 0, 1L, 1, "bar");
3827+
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
3828+
records.put(new TopicPartition("foo", 0), List.of(firstRecord, secondRecord));
3829+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3830+
AtomicBoolean first = new AtomicBoolean(true);
3831+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3832+
Thread.sleep(50);
3833+
return first.getAndSet(false) ? consumerRecords : ConsumerRecords.empty();
3834+
});
3835+
CountDownLatch latch = new CountDownLatch(1);
3836+
willAnswer(inv -> {
3837+
latch.countDown();
3838+
return null;
3839+
}).given(consumer).commitSync(any(), any());
3840+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3841+
new TopicPartitionOffset("foo", 0) };
3842+
3843+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3844+
containerProps.setGroupId("grp");
3845+
containerProps.setAckMode(AckMode.BATCH);
3846+
3847+
containerProps.setMessageListener((BatchMessageListener) msgs -> {
3848+
});
3849+
containerProps.setClientId("clientId");
3850+
if (!early) {
3851+
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
3852+
}
3853+
3854+
BatchInterceptor<Integer, String> interceptor = spy(new BatchInterceptor<Integer, String>() {
3855+
3856+
@Override
3857+
@Nullable
3858+
public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, String> records,
3859+
Consumer<Integer, String> consumer) {
3860+
3861+
return null;
3862+
}
3863+
3864+
});
3865+
3866+
KafkaMessageListenerContainer<Integer, String> container =
3867+
new KafkaMessageListenerContainer<>(cf, containerProps);
3868+
container.setBatchInterceptor(interceptor);
3869+
container.setInterceptBeforeTx(early);
3870+
container.start();
3871+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3872+
3873+
InOrder inOrder = inOrder(interceptor, consumer);
3874+
inOrder.verify(interceptor).setupThreadState(eq(consumer));
3875+
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
3876+
inOrder.verify(interceptor).intercept(any(), eq(consumer));
3877+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3878+
any(Duration.class));
3879+
container.stop();
3880+
}
3881+
37383882
@Test
37393883
@SuppressWarnings({"unchecked", "deprecation"})
37403884
public void testInvokeRecordInterceptorFailure() throws Exception {

0 commit comments

Comments
 (0)