Skip to content

Commit 6d3eb9a

Browse files
garyrussellartembilan
authored andcommitted
spring-projectsGH-566: Ack Concurrency Issue
Fixes spring-projects#566 If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks. Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack is called on a foreign thread - the `Consumer` is not thread-safe. - Revert `offsets` to simple `HashMap`s - Only reference `offsets` on the consumer thread - enqueue foreign acks into the `acks` queue (even "immediate" acks) **Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).**
1 parent b0be198 commit 6d3eb9a

File tree

2 files changed

+93
-9
lines changed

2 files changed

+93
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Set;
3131
import java.util.concurrent.BlockingQueue;
3232
import java.util.concurrent.ConcurrentHashMap;
33-
import java.util.concurrent.ConcurrentMap;
3433
import java.util.concurrent.LinkedBlockingQueue;
3534
import java.util.concurrent.ScheduledFuture;
3635
import java.util.stream.Collectors;
@@ -52,6 +51,7 @@
5251
import org.apache.kafka.common.errors.WakeupException;
5352

5453
import org.springframework.core.task.SimpleAsyncTaskExecutor;
54+
import org.springframework.kafka.KafkaException;
5555
import org.springframework.kafka.core.ConsumerFactory;
5656
import org.springframework.kafka.core.KafkaResourceHolder;
5757
import org.springframework.kafka.core.ProducerFactoryUtils;
@@ -346,7 +346,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
346346

347347
private final Consumer<K, V> consumer;
348348

349-
private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsets = new ConcurrentHashMap<>();
349+
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
350350

351351
private final GenericMessageListener<?> genericListener;
352352

@@ -406,6 +406,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
406406

407407
private volatile Collection<TopicPartition> assignedPartitions;
408408

409+
private volatile Thread consumerThread;
410+
409411
private int count;
410412

411413
private long last = System.currentTimeMillis();
@@ -662,6 +664,7 @@ public boolean isLongLived() {
662664

663665
@Override
664666
public void run() {
667+
this.consumerThread = Thread.currentThread();
665668
if (this.genericListener instanceof ConsumerSeekAware) {
666669
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
667670
}
@@ -818,16 +821,27 @@ record = this.acks.poll();
818821
}
819822

820823
private void processAck(ConsumerRecord<K, V> record) {
821-
if (this.isManualImmediateAck) {
824+
if (!Thread.currentThread().equals(this.consumerThread)) {
822825
try {
823-
ackImmediate(record);
826+
this.acks.put(record);
824827
}
825-
catch (WakeupException e) {
826-
// ignore - not polling
828+
catch (InterruptedException e) {
829+
Thread.currentThread().interrupt();
830+
throw new KafkaException("Interrupted while storing ack", e);
827831
}
828832
}
829833
else {
830-
addOffset(record);
834+
if (this.isManualImmediateAck) {
835+
try {
836+
ackImmediate(record);
837+
}
838+
catch (WakeupException e) {
839+
// ignore - not polling
840+
}
841+
}
842+
else {
843+
addOffset(record);
844+
}
831845
}
832846
}
833847

@@ -1325,7 +1339,7 @@ private void commitIfNecessary() {
13251339

13261340
private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
13271341
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
1328-
for (Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsets.entrySet()) {
1342+
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
13291343
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
13301344
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
13311345
new OffsetAndMetadata(offset.getValue() + 1));

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+71-1
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,77 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
551551
container.stop();
552552
}
553553

554+
@Test
555+
public void testRecordAckMockForeignThread() throws Exception {
556+
testRecordAckMockForeignThreadGuts(AckMode.MANUAL);
557+
}
558+
559+
@Test
560+
public void testRecordAckMockForeignThreadImmediate() throws Exception {
561+
testRecordAckMockForeignThreadGuts(AckMode.MANUAL_IMMEDIATE);
562+
}
563+
564+
@SuppressWarnings("unchecked")
565+
private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exception {
566+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
567+
Consumer<Integer, String> consumer = mock(Consumer.class);
568+
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
569+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
570+
records.put(new TopicPartition("foo", 0), Arrays.asList(
571+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
572+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
573+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
574+
given(consumer.poll(anyLong())).willAnswer(i -> {
575+
Thread.sleep(50);
576+
return consumerRecords;
577+
});
578+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
579+
new TopicPartitionInitialOffset("foo", 0) };
580+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
581+
containerProps.setAckMode(ackMode);
582+
final CountDownLatch latch = new CountDownLatch(2);
583+
final List<Acknowledgment> acks = new ArrayList<>();
584+
final AtomicReference<Thread> consumerThread = new AtomicReference<>();
585+
AcknowledgingMessageListener<Integer, String> messageListener = spy(
586+
new AcknowledgingMessageListener<Integer, String>() {
587+
588+
@Override
589+
public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment acknowledgment) {
590+
acks.add(acknowledgment);
591+
consumerThread.set(Thread.currentThread());
592+
latch.countDown();
593+
if (latch.getCount() == 0) {
594+
records.clear();
595+
}
596+
}
597+
598+
});
599+
600+
final CountDownLatch commitLatch = new CountDownLatch(1);
601+
final AtomicReference<Thread> commitThread = new AtomicReference<>();
602+
willAnswer(i -> {
603+
commitThread.set(Thread.currentThread());
604+
commitLatch.countDown();
605+
return null;
606+
}
607+
).given(consumer).commitSync(any(Map.class));
608+
609+
containerProps.setMessageListener(messageListener);
610+
containerProps.setClientId("clientId");
611+
KafkaMessageListenerContainer<Integer, String> container =
612+
new KafkaMessageListenerContainer<>(cf, containerProps);
613+
container.start();
614+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
615+
acks.get(1).acknowledge();
616+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
617+
InOrder inOrder = inOrder(messageListener, consumer);
618+
inOrder.verify(consumer).poll(1000);
619+
inOrder.verify(messageListener, times(2)).onMessage(any(ConsumerRecord.class), any(Acknowledgment.class));
620+
inOrder.verify(consumer).commitSync(any(Map.class));
621+
container.stop();
622+
assertThat(commitThread.get()).isSameAs(consumerThread.get());
623+
}
624+
554625
@SuppressWarnings("unchecked")
555626
@Test
556627
public void testNonResponsiveConsumerEvent() throws Exception {
@@ -1576,7 +1647,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
15761647
final CountDownLatch commitLatch = new CountDownLatch(2);
15771648
willAnswer(invocation -> {
15781649

1579-
@SuppressWarnings({ "unchecked" })
15801650
Map<TopicPartition, OffsetAndMetadata> map = invocation.getArgument(0);
15811651
try {
15821652
return invocation.callRealMethod();

0 commit comments

Comments
 (0)