|
35 | 35 | import static org.springframework.kafka.test.assertj.KafkaConditions.value;
|
36 | 36 |
|
37 | 37 | import java.time.Duration;
|
38 |
| -import java.util.ArrayList; |
39 | 38 | import java.util.Collection;
|
40 | 39 | import java.util.Collections;
|
41 | 40 | import java.util.HashMap;
|
42 |
| -import java.util.List; |
43 | 41 | import java.util.Map;
|
44 | 42 | import java.util.concurrent.CountDownLatch;
|
45 | 43 | import java.util.concurrent.TimeUnit;
|
|
90 | 88 | import org.springframework.kafka.support.KafkaHeaders;
|
91 | 89 | import org.springframework.kafka.support.KafkaNull;
|
92 | 90 | import org.springframework.kafka.support.SendResult;
|
93 |
| -import org.springframework.kafka.support.TransactionSupport; |
94 | 91 | import org.springframework.kafka.support.converter.RecordMessageConverter;
|
95 | 92 | import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
96 | 93 | import org.springframework.kafka.test.utils.KafkaTestUtils;
|
@@ -525,13 +522,8 @@ void testConsumeAndProduceTransaction() throws Exception {
|
525 | 522 | return null;
|
526 | 523 | }).given(producer).close(any());
|
527 | 524 | ProducerFactory pf = mock(ProducerFactory.class);
|
528 |
| - given(pf.isProducerPerConsumerPartition()).willReturn(true); |
529 | 525 | given(pf.transactionCapable()).willReturn(true);
|
530 |
| - final List<String> transactionalIds = new ArrayList<>(); |
531 |
| - willAnswer(i -> { |
532 |
| - transactionalIds.add(TransactionSupport.getTransactionIdSuffix()); |
533 |
| - return producer; |
534 |
| - }).given(pf).createProducer(isNull()); |
| 526 | + willReturn(producer).given(pf).createProducer(isNull()); |
535 | 527 | PlatformTransactionManager ptm = new KafkaTransactionManager(pf);
|
536 | 528 | ContainerProperties props = new ContainerProperties("foo");
|
537 | 529 | props.setGroupId("group");
|
@@ -567,8 +559,6 @@ void testConsumeAndProduceTransaction() throws Exception {
|
567 | 559 | container.stop();
|
568 | 560 | verify(pf, times(2)).createProducer(isNull());
|
569 | 561 | verifyNoMoreInteractions(producer);
|
570 |
| - assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0"); |
571 |
| - assertThat(transactionalIds.get(1)).isEqualTo("group.foo.0"); |
572 | 562 | }
|
573 | 563 |
|
574 | 564 | @SuppressWarnings({ "rawtypes", "unchecked" })
|
@@ -670,17 +660,14 @@ void testConsumeAndProduceTransactionTxIdOverride() throws Exception {
|
670 | 660 | return null;
|
671 | 661 | }).given(producer).close(any());
|
672 | 662 | AtomicReference<String> txId = new AtomicReference<>();
|
673 |
| - final List<String> transactionalIds = new ArrayList<>(); |
674 | 663 | DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(Collections.emptyMap()) {
|
675 | 664 |
|
676 | 665 | @Override
|
677 |
| - protected Producer createTransactionalProducerForPartition(String txIdPrefix) { |
| 666 | + protected Producer createTransactionalProducer(String txIdPrefix) { |
678 | 667 | txId.set(txIdPrefix);
|
679 |
| - transactionalIds.add(TransactionSupport.getTransactionIdSuffix()); |
680 | 668 | return producer;
|
681 | 669 | }
|
682 | 670 |
|
683 |
| - |
684 | 671 | };
|
685 | 672 | pf.setTransactionIdPrefix("default.tx.id.");
|
686 | 673 | KafkaTransactionManager tm = new KafkaTransactionManager(pf);
|
@@ -719,9 +706,6 @@ protected Producer createTransactionalProducerForPartition(String txIdPrefix) {
|
719 | 706 | inOrder.verify(producer).close(any());
|
720 | 707 | container.stop();
|
721 | 708 | verifyNoMoreInteractions(producer);
|
722 |
| - assertThat(transactionalIds).hasSizeGreaterThanOrEqualTo(2); |
723 |
| - assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0"); |
724 |
| - assertThat(transactionalIds.get(1)).isEqualTo("group.foo.0"); |
725 | 709 | assertThat(txId.get()).isEqualTo("tm.tx.id.");
|
726 | 710 | }
|
727 | 711 |
|
|
0 commit comments