Skip to content

Commit 41196ac

Browse files
committed
spring-projectsGH-623: Fix AckMode.COUNT
Fixes spring-projects#623 Grab ack count before moving acks to offsets.
1 parent 6d3eb9a commit 41196ac

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1170,8 +1170,8 @@ private void sendOffsetsToTransaction(Producer producer) {
11701170
}
11711171

11721172
private void processCommits() {
1173-
handleAcks();
11741173
this.count += this.acks.size();
1174+
handleAcks();
11751175
long now;
11761176
AckMode ackMode = this.containerProperties.getAckMode();
11771177
if (!this.isManualImmediateAck) {

spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java

+5
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,9 @@ public RecordMetadata getRecordMetadata() {
4747
return this.recordMetadata;
4848
}
4949

50+
@Override
51+
public String toString() {
52+
return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
53+
}
54+
5055
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+65
Original file line numberDiff line numberDiff line change
@@ -1863,6 +1863,71 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
18631863
container.stop();
18641864
}
18651865

1866+
@SuppressWarnings({ "unchecked", "rawtypes" })
1867+
@Test
1868+
public void testAckModeCount() throws Exception {
1869+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
1870+
Consumer<Integer, String> consumer = mock(Consumer.class);
1871+
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1872+
TopicPartition topicPartition = new TopicPartition("foo", 0);
1873+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records1 = new HashMap<>();
1874+
records1.put(topicPartition, Arrays.asList(
1875+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
1876+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
1877+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records2 = new HashMap<>();
1878+
records2.put(topicPartition, Arrays.asList(
1879+
new ConsumerRecord<>("foo", 0, 2L, 1, "baz"),
1880+
new ConsumerRecord<>("foo", 0, 3L, 1, "qux"))); // commit (4 >= 3)
1881+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records3 = new HashMap<>();
1882+
records3.put(topicPartition, Arrays.asList(
1883+
new ConsumerRecord<>("foo", 0, 4L, 1, "fiz"),
1884+
new ConsumerRecord<>("foo", 0, 5L, 1, "buz"),
1885+
new ConsumerRecord<>("foo", 0, 6L, 1, "bif"))); // commit (3 >= 3)
1886+
ConsumerRecords<Integer, String> consumerRecords1 = new ConsumerRecords<>(records1);
1887+
ConsumerRecords<Integer, String> consumerRecords2 = new ConsumerRecords<>(records2);
1888+
ConsumerRecords<Integer, String> consumerRecords3 = new ConsumerRecords<>(records3);
1889+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
1890+
AtomicInteger which = new AtomicInteger();
1891+
given(consumer.poll(anyLong())).willAnswer(i -> {
1892+
Thread.sleep(50);
1893+
int recordsToUse = which.incrementAndGet();
1894+
switch (recordsToUse) {
1895+
case 1:
1896+
return consumerRecords1;
1897+
case 2:
1898+
return consumerRecords2;
1899+
case 3:
1900+
return consumerRecords3;
1901+
default:
1902+
return emptyRecords;
1903+
}
1904+
});
1905+
final CountDownLatch commitLatch = new CountDownLatch(2);
1906+
willAnswer(i -> {
1907+
commitLatch.countDown();
1908+
return null;
1909+
}).given(consumer).commitSync(any(Map.class));
1910+
given(consumer.assignment()).willReturn(records1.keySet());
1911+
TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
1912+
new TopicPartitionInitialOffset("foo", 0) };
1913+
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
1914+
containerProps.setAckMode(AckMode.COUNT);
1915+
containerProps.setAckCount(3);
1916+
containerProps.setClientId("clientId");
1917+
AtomicInteger recordCount = new AtomicInteger();
1918+
containerProps.setMessageListener((MessageListener) r -> {
1919+
recordCount.incrementAndGet();
1920+
});
1921+
KafkaMessageListenerContainer<Integer, String> container =
1922+
new KafkaMessageListenerContainer<>(cf, containerProps);
1923+
container.start();
1924+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
1925+
assertThat(recordCount.get()).isEqualTo(7);
1926+
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(4L)));
1927+
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(7L)));
1928+
container.stop();
1929+
}
1930+
18661931
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
18671932
Consumer<?, ?> consumer = spy(
18681933
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)