Skip to content

Commit bb99ec0

Browse files
committed
spring-projectsGH-2329: Fix interceptBeforeTx for Non-Kafka TM
Resolves spring-projects#2329 The flag should apply, regardless of PTM type.
1 parent 36665e9 commit bb99ec0

File tree

2 files changed

+78
-5
lines changed

2 files changed

+78
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -667,12 +667,13 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
667667

668668
private final Duration syncCommitTimeout;
669669

670-
private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
671-
? getRecordInterceptor()
672-
: null;
670+
private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx()
671+
&& this.transactionManager != null
672+
? getRecordInterceptor()
673+
: null;
673674

674675
private final RecordInterceptor<K, V> earlyRecordInterceptor =
675-
isInterceptBeforeTx() || this.kafkaTxManager == null
676+
isInterceptBeforeTx() || this.transactionManager == null
676677
? getRecordInterceptor()
677678
: null;
678679

@@ -683,7 +684,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
683684
: null;
684685

685686
private final BatchInterceptor<K, V> earlyBatchInterceptor =
686-
isInterceptBeforeTx() || this.kafkaTxManager == null
687+
isInterceptBeforeTx() || this.transactionManager == null
687688
? getBatchInterceptor()
688689
: null;
689690

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

+72
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6969
import org.springframework.lang.Nullable;
7070
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
71+
import org.springframework.transaction.PlatformTransactionManager;
7172
import org.springframework.transaction.support.TransactionSynchronizationManager;
7273

7374
/**
@@ -655,6 +656,77 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
655656
}
656657
}
657658

659+
@Test
660+
@SuppressWarnings({ "rawtypes", "unchecked" })
661+
void testInterceptInTxNonKafkaTM() throws InterruptedException {
662+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
663+
final Consumer consumer = mock(Consumer.class);
664+
TopicPartition tp0 = new TopicPartition("foo", 0);
665+
ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
666+
ConsumerRecords records = new ConsumerRecords(
667+
Collections.singletonMap(tp0, Collections.singletonList(record1)));
668+
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
669+
AtomicInteger firstOrSecondPoll = new AtomicInteger();
670+
willAnswer(invocation -> {
671+
Thread.sleep(10);
672+
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
673+
}).given(consumer).poll(any());
674+
List<TopicPartition> assignments = Arrays.asList(tp0);
675+
willAnswer(invocation -> {
676+
((ConsumerRebalanceListener) invocation.getArgument(1))
677+
.onPartitionsAssigned(assignments);
678+
return null;
679+
}).given(consumer).subscribe(any(Collection.class), any());
680+
given(consumer.position(any())).willReturn(0L);
681+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
682+
.willReturn(consumer);
683+
ContainerProperties containerProperties = new ContainerProperties("foo");
684+
containerProperties.setGroupId("grp");
685+
AtomicReference<List<ConsumerRecord<String, String>>> received = new AtomicReference<>();
686+
containerProperties.setMessageListener((MessageListener) rec -> {
687+
});
688+
containerProperties.setMissingTopicsFatal(false);
689+
List<String> order = new ArrayList<>();
690+
CountDownLatch latch = new CountDownLatch(2);
691+
PlatformTransactionManager tm = mock(PlatformTransactionManager.class);
692+
willAnswer(inv -> {
693+
order.add("tx");
694+
latch.countDown();
695+
return null;
696+
}).given(tm).getTransaction(any());
697+
containerProperties.setTransactionManager(tm);
698+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
699+
containerProperties);
700+
CountDownLatch successCalled = new CountDownLatch(1);
701+
container.setRecordInterceptor(new RecordInterceptor() {
702+
703+
@Override
704+
@Nullable
705+
public ConsumerRecord intercept(ConsumerRecord rec, Consumer consumer) {
706+
order.add("interceptor");
707+
latch.countDown();
708+
return rec;
709+
}
710+
711+
@Override
712+
public void success(ConsumerRecord record, Consumer consumer) {
713+
order.add("success");
714+
successCalled.countDown();
715+
}
716+
717+
});
718+
container.setInterceptBeforeTx(false);
719+
container.start();
720+
try {
721+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
722+
assertThat(successCalled.await(10, TimeUnit.SECONDS)).isTrue();
723+
assertThat(order).containsExactly("tx", "interceptor", "success");
724+
}
725+
finally {
726+
container.stop();
727+
}
728+
}
729+
658730
@SuppressWarnings({ "rawtypes", "unchecked" })
659731
@Test
660732
void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {

0 commit comments

Comments
 (0)