diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java index 044c451e2..3ac6ef5e8 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java @@ -26,9 +26,9 @@ import org.apache.pulsar.common.schema.SchemaType; import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.reactive.config.ReactivePulsarListenerContainerFactory; import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpointRegistry; -import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; /** * Annotation that marks a method to be the target of a Pulsar message listener on the @@ -67,7 +67,9 @@ /** * Pulsar subscription name associated with this listener. - * @return the {@code subscriptionName} for this Pulsar listener endpoint. + *

+ * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. + * @return the subscription name for this listener */ String subscriptionName() default ""; @@ -86,8 +88,8 @@ SchemaType schemaType() default SchemaType.NONE; /** - * The bean name of the {@link ReactivePulsarListenerContainerFactory} to use to - * create the message listener container responsible to serve this endpoint. + * The bean name of the {@link PulsarListenerContainerFactory} to use to create the + * message listener container responsible to serve this endpoint. *

* If not specified, the default container factory is used, if any. If a SpEL * expression is provided ({@code #{...}}), the expression can either evaluate to a @@ -98,24 +100,28 @@ /** * Topics to listen to. - * @return a comma separated list of topics to listen from. + *

+ * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. + * @return an array of topics to listen to */ String[] topics() default {}; /** * Topic patten to listen to. - * @return topic pattern to listen to. + *

+ * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. + * @return topic pattern to listen to */ String topicPattern() default ""; /** - * Set to true or false, to override the default setting in the container factory. May - * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or - * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to - * obtain the value. + * Whether to automatically start the container for this listener. *

- * SpEL {@code #{...}} and property place holders {@code ${...}} are supported. - * @return true to auto start, false to not auto start. + * The value can be a literal string representation of boolean (e.g. {@code 'true'}) + * or a property placeholder {@code ${...}} that resolves to a literal. SpEL + * {@code #{...}} expressions that evaluate to a {@link Boolean} or a literal are + * supported. + * @return whether to automatically start the container for this listener */ String autoStartup() default ""; @@ -136,12 +142,13 @@ String beanRef() default "__listener"; /** - * Override the container factory's {@code concurrency} setting for this listener. May - * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in - * which case {@link Number#intValue()} is used to obtain the value. + * Override the container factory's {@code concurrency} setting for this listener. *

- * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. - * @return the concurrency. + * The value can be a literal string representation of {@link Number} (e.g. + * {@code '3'}) or a property placeholder {@code ${...}} that resolves to a literal. + * SpEL {@code #{...}} expressions that evaluate to a {@link Number} or a literal are + * supported. + * @return the concurrency for this listener */ String concurrency() default ""; @@ -158,7 +165,7 @@ String useKeyOrderedProcessing() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a + * The bean name or a SpEL expression that resolves to a * {@link org.apache.pulsar.client.api.DeadLetterPolicy} to use on the consumer to * configure a dead letter policy for message redelivery. * @return the bean name or empty string to not set any dead letter policy. @@ -166,9 +173,11 @@ String deadLetterPolicy() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a - * {@link ReactiveMessageConsumerBuilderCustomizer} to use to configure the consumer. - * @return the bean name or empty string to not configure the consumer. + * The bean name or a SpEL expression that resolves to a + * {@link ReactivePulsarListenerMessageConsumerBuilderCustomizer} to use to configure + * the underlying consumer. + * @return the bean name or SpEL expression to the customizer or an empty string to + * not customize the consumer */ String consumerCustomizer() default ""; diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java new file mode 100644 index 000000000..cd19db62d --- /dev/null +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java @@ -0,0 +1,413 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; +import org.springframework.pulsar.reactive.config.ReactivePulsarListenerContainerFactory; +import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpointRegistry; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.AutoStartupAttribute.AutoStartupAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.ConcurrencyAttribute.ConcurrencyAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.ConsumerCustomizerAttribute.ConsumerCustomizerAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.ContainerFactoryAttribute.ContainerFactoryAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.DeadLetterPolicyAttribute.DeadLetterPolicyAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.IdAttribute.IdAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.SubscriptionNameAttribute.SubscriptionNameAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.TopicsAttribute.TopicsAttributeConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerSpelTests.UseKeyOrderedProcessingAttribute.UseKeyOrderedProcessingAttributeConfig; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * Tests {@code SpEL} functionality in + * {@link ReactivePulsarListener @ReactivePulsarListener} attributes. + * + * @author Chris Bono + */ +class ReactivePulsarListenerSpelTests extends ReactivePulsarListenerTestsBase { + + private static final String TOPIC = "pulsar-reactive-listener-spel-tests-topic"; + + @Nested + @ContextConfiguration(classes = IdAttributeConfig.class) + @TestPropertySource(properties = "foo.id = foo") + class IdAttribute { + + @Test + void containerIdDerivedFromAttribute(@Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).isNotNull(); + assertThat(registry.getListenerContainer("bar")).isNotNull(); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class IdAttributeConfig { + + @ReactivePulsarListener(topics = TOPIC, id = "${foo.id}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "#{T(java.lang.String).valueOf('bar')}") + void listen2(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = SubscriptionNameAttributeConfig.class) + @TestPropertySource(properties = "foo.subscriptionName = fooSub") + class SubscriptionNameAttribute { + + @Test + void subscriptionNameDerivedFromAttribute(@Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().getSubscriptionName()) + .isEqualTo("fooSub"); + assertThat(registry.getListenerContainer("bar").getContainerProperties().getSubscriptionName()) + .isEqualTo("barSub"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class SubscriptionNameAttributeConfig { + + @ReactivePulsarListener(topics = TOPIC, id = "foo", subscriptionName = "${foo.subscriptionName}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "bar", + subscriptionName = "#{T(java.lang.String).valueOf('barSub')}") + void listen2(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = TopicsAttributeConfig.class) + @TestPropertySource(properties = { "foo.topics = foo", "foo.topicPattern = foo*" }) + class TopicsAttribute { + + @Test + void topicsDerivedFromAttribute(@Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().getTopics()) + .containsExactly("foo"); + assertThat(registry.getListenerContainer("bar").getContainerProperties().getTopics()) + .containsExactly("bar"); + assertThat(registry.getListenerContainer("zaa").getContainerProperties().getTopicsPattern().pattern()) + .isEqualTo("foo*"); + assertThat(registry.getListenerContainer("laa").getContainerProperties().getTopicsPattern().pattern()) + .isEqualTo("bar*"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class TopicsAttributeConfig { + + @ReactivePulsarListener(topics = "${foo.topics}", id = "foo") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = "#{T(java.lang.String).valueOf('bar')}", id = "bar") + void listen2(String ignored) { + } + + @ReactivePulsarListener(topicPattern = "${foo.topicPattern}", id = "zaa") + void listen3(String ignored) { + } + + @ReactivePulsarListener(topicPattern = "#{T(java.lang.String).valueOf('bar*')}", id = "laa") + void listen4(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ContainerFactoryAttributeConfig.class) + class ContainerFactoryAttribute { + + @Test + void containerFactoryDerivedFromAttribute( + @Autowired ReactivePulsarListenerContainerFactory containerFactory) { + verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("foo"))); + verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("bar"))); + verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("zaa"))); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ContainerFactoryAttributeConfig { + + @SuppressWarnings({ "unchecked", "SpringJavaInjectionPointsAutowiringInspection" }) + @Bean + @Primary + ReactivePulsarListenerContainerFactory customContainerFactory( + ReactivePulsarConsumerFactory pulsarConsumerFactory) { + return spy(new DefaultReactivePulsarListenerContainerFactory<>(pulsarConsumerFactory, + new ReactivePulsarContainerProperties<>())); + } + + @ReactivePulsarListener(topics = TOPIC, id = "foo", containerFactory = "#{@customContainerFactory}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "bar", + containerFactory = "#{T(java.lang.String).valueOf('customContainerFactory')}") + void listen2(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "zaa", containerFactory = "customContainerFactory") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = AutoStartupAttributeConfig.class) + @TestPropertySource(properties = "foo.auto-start = true") + class AutoStartupAttribute { + + @Test + void containerAutoStartupDerivedFromAttribute( + @Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").isAutoStartup()).isTrue(); + assertThat(registry.getListenerContainer("bar").isAutoStartup()).isFalse(); + assertThat(registry.getListenerContainer("zaa").isAutoStartup()).isTrue(); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class AutoStartupAttributeConfig { + + @ReactivePulsarListener(topics = TOPIC, id = "foo", autoStartup = "${foo.auto-start}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "bar", + autoStartup = "#{T(java.lang.Boolean).valueOf('false')}") + void listen2(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "zaa", autoStartup = "true") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ConcurrencyAttributeConfig.class) + @TestPropertySource(properties = "foo.concurrency = 2") + class ConcurrencyAttribute { + + @Test + void containerAutoStartupDerivedFromAttribute( + @Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().getConcurrency()).isEqualTo(2); + assertThat(registry.getListenerContainer("bar").getContainerProperties().getConcurrency()).isEqualTo(3); + assertThat(registry.getListenerContainer("zaa").getContainerProperties().getConcurrency()).isEqualTo(4); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ConcurrencyAttributeConfig { + + @ReactivePulsarListener(topics = TOPIC, id = "foo", concurrency = "${foo.concurrency}", + subscriptionType = SubscriptionType.Shared) + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "bar", concurrency = "#{T(java.lang.Integer).valueOf('3')}", + subscriptionType = SubscriptionType.Shared) + void listen2(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "zaa", concurrency = "4", + subscriptionType = SubscriptionType.Shared) + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = UseKeyOrderedProcessingAttributeConfig.class) + @TestPropertySource(properties = "foo.key-ordered = true") + class UseKeyOrderedProcessingAttribute { + + @Test + void containerUseKeyOrderedProcessingDerivedFromAttribute( + @Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().isUseKeyOrderedProcessing()) + .isTrue(); + assertThat(registry.getListenerContainer("bar").getContainerProperties().isUseKeyOrderedProcessing()) + .isFalse(); + assertThat(registry.getListenerContainer("zaa").getContainerProperties().isUseKeyOrderedProcessing()) + .isTrue(); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class UseKeyOrderedProcessingAttributeConfig { + + @ReactivePulsarListener(topics = TOPIC, id = "foo", useKeyOrderedProcessing = "${foo.key-ordered}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "bar", + useKeyOrderedProcessing = "#{T(java.lang.Boolean).valueOf('false')}") + void listen2(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "zaa", useKeyOrderedProcessing = "true") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = DeadLetterPolicyAttributeConfig.class) + class DeadLetterPolicyAttribute { + + @Test + void deadLetterPolicyDerivedFromAttribute(@Autowired ReactivePulsarListenerEndpointRegistry registry) { + assertDeadLetterPolicy(registry, "foo"); + assertDeadLetterPolicy(registry, "bar"); + assertDeadLetterPolicy(registry, "zaa"); + } + + private void assertDeadLetterPolicy(ReactivePulsarListenerEndpointRegistry registry, + String containerId) { + assertThat(registry.getListenerContainer(containerId)).extracting("pulsarConsumerFactory") + .extracting("topicNameToConsumerSpec", + InstanceOfAssertFactories.map(String.class, ReactiveMessageConsumerSpec.class)) + .extractingByKey("%s-topic".formatted(containerId)) + .extracting(ReactiveMessageConsumerSpec::getDeadLetterPolicy) + .isSameAs(DeadLetterPolicyAttributeConfig.CUSTOM_DLP); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class DeadLetterPolicyAttributeConfig { + + static DeadLetterPolicy CUSTOM_DLP = DeadLetterPolicy.builder() + .deadLetterTopic("dlt") + .maxRedeliverCount(2) + .build(); + + @Bean + DeadLetterPolicy customDeadLetterPolicy() { + return CUSTOM_DLP; + } + + @ReactivePulsarListener(id = "foo", topics = "foo-topic", deadLetterPolicy = "#{@customDeadLetterPolicy}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(id = "bar", topics = "bar-topic", + deadLetterPolicy = "#{T(java.lang.String).valueOf('customDeadLetterPolicy')}") + void listen2(String ignored) { + } + + @ReactivePulsarListener(id = "zaa", topics = "zaa-topic", deadLetterPolicy = "customDeadLetterPolicy") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ConsumerCustomizerAttributeConfig.class) + class ConsumerCustomizerAttribute { + + @Test + void consumerCustomizerDerivedFromAttribute() { + assertThat(ConsumerCustomizerAttributeConfig.CUSTOMIZED_CONTAINERS_SUBSCRIPTION_NAMES) + .containsExactlyInAnyOrder("fooSub", "barSub", "zaaSub"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ConsumerCustomizerAttributeConfig { + + static List CUSTOMIZED_CONTAINERS_SUBSCRIPTION_NAMES = new ArrayList<>(); + + @Bean + ReactivePulsarListenerMessageConsumerBuilderCustomizer customConsumerCustomizer() { + return (builder) -> { + var conf = ReflectionTestUtils.getField(builder, "consumerSpec"); + assertThat(conf).isNotNull(); + CUSTOMIZED_CONTAINERS_SUBSCRIPTION_NAMES + .add(Objects.toString(ReflectionTestUtils.getField(conf, "subscriptionName"), "???")); + }; + } + + @ReactivePulsarListener(topics = TOPIC, id = "foo", subscriptionName = "fooSub", + consumerCustomizer = "#{@customConsumerCustomizer}") + void listen1(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "bar", subscriptionName = "barSub", + consumerCustomizer = "#{T(java.lang.String).valueOf('customConsumerCustomizer')}") + void listen2(String ignored) { + } + + @ReactivePulsarListener(topics = TOPIC, id = "zaa", subscriptionName = "zaaSub", + consumerCustomizer = "customConsumerCustomizer") + void listen3(String ignored) { + } + + } + + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java index 5da376134..0102ef241 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ *

* If none is specified an auto-generated id is used. *

- * SpEL {@code #{...}} and property place holders {@code ${...}} are supported. + * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. * @return the {@code id} for the container managing for this endpoint. * @see PulsarListenerEndpointRegistry#getListenerContainer(String) */ @@ -69,7 +69,9 @@ /** * Pulsar subscription name associated with this listener. - * @return the {@code subscriptionName} for this Pulsar listener endpoint. + *

+ * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. + * @return the subscription name for this listener */ String subscriptionName() default ""; @@ -100,24 +102,28 @@ /** * Topics to listen to. - * @return a comma separated list of topics to listen from. + *

+ * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. + * @return an array of topics to listen to */ String[] topics() default {}; /** * Topic patten to listen to. - * @return topic pattern to listen to. + *

+ * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. + * @return topic pattern to listen to */ String topicPattern() default ""; /** - * Set to true or false, to override the default setting in the container factory. May - * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or - * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to - * obtain the value. + * Whether to automatically start the container for this listener. *

- * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. - * @return true to auto start, false to not auto start. + * The value can be a literal string representation of boolean (e.g. {@code 'true'}) + * or a property placeholder {@code ${...}} that resolves to a literal. SpEL + * {@code #{...}} expressions that evaluate to a {@link Boolean} or a literal are + * supported. + * @return whether to automatically start the container for this listener */ String autoStartup() default ""; @@ -153,7 +159,7 @@ * * {@code group.id} and {@code client.id} are ignored. *

- * SpEL {@code #{...}} and property place holders {@code ${...}} are supported. SpEL + * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. SpEL * expressions must resolve to a {@link String}, a @{link String[]} or a * {@code Collection} where each member of the array or collection is a * property name + value with the above formats. @@ -162,17 +168,18 @@ String[] properties() default {}; /** - * Override the container factory's {@code concurrency} setting for this listener. May - * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in - * which case {@link Number#intValue()} is used to obtain the value. + * Override the container factory's {@code concurrency} setting for this listener. *

- * SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. - * @return the concurrency. + * The value can be a literal string representation of {@link Number} (e.g. + * {@code '3'}) or a property placeholder {@code ${...}} that resolves to a literal. + * SpEL {@code #{...}} expressions that evaluate to a {@link Number} or a literal are + * supported. + * @return the concurrency for this listener */ String concurrency() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a + * The bean name or a SpEL expression that resolves to a * {@link org.apache.pulsar.client.api.RedeliveryBackoff} to use on the consumer to * control the redelivery backoff of messages after a negative ack. * @return the bean name or empty string to not set the backoff. @@ -180,7 +187,7 @@ String negativeAckRedeliveryBackoff() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a + * The bean name or a SpEL expression that resolves to a * {@link org.apache.pulsar.client.api.RedeliveryBackoff} to use on the consumer to * control the redelivery backoff of messages after an acknowledgment timeout. * @return the bean name or empty string to not set the backoff. @@ -188,7 +195,7 @@ String ackTimeoutRedeliveryBackoff() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a + * The bean name or a SpEL expression that resolves to a * {@link org.apache.pulsar.client.api.DeadLetterPolicy} to use on the consumer to * configure a dead letter policy for message redelivery. * @return the bean name or empty string to not set any dead letter policy. @@ -202,7 +209,7 @@ AckMode ackMode() default AckMode.BATCH; /** - * The bean name or a 'SpEL' expression that resolves to a + * The bean name or a SpEL expression that resolves to a * {@link org.springframework.pulsar.listener.PulsarConsumerErrorHandler} which is * used as a Spring provided mechanism to handle errors from processing the message. * @return the bean name for the consumer error handler or an empty string. @@ -210,7 +217,7 @@ String pulsarConsumerErrorHandler() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a + * The bean name or a SpEL expression that resolves to a * {@link PulsarListenerConsumerBuilderCustomizer} to use to configure the underlying * consumer. * @return the bean name or SpEL expression to the customizer or an empty string to diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReader.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReader.java index 3dbb9a061..1b7c8834d 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReader.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReader.java @@ -27,7 +27,6 @@ import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.pulsar.config.PulsarReaderEndpointRegistry; -import org.springframework.pulsar.core.ReaderBuilderCustomizer; @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @@ -36,12 +35,12 @@ public @interface PulsarReader { /** - * The unique identifier of the container for this listener. + * The unique identifier of the container for this reader. *

* If none is specified an auto-generated id is used. *

* SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. - * @return the {@code id} for the container managing for this endpoint. + * @return the id of the container for this reader * @see PulsarReaderEndpointRegistry#getReaderContainer(String) */ String id() default ""; @@ -71,8 +70,8 @@ String autoStartup() default ""; /** - * The bean name or a 'SpEL' expression that resolves to a - * {@link ReaderBuilderCustomizer} to use to configure the reader. + * The bean name or a SpEL expression that resolves to a + * {@link PulsarReaderReaderBuilderCustomizer} to use to configure the reader. * @return the bean name or empty string to not configure the reader. */ String readerCustomizer() default ""; diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java new file mode 100644 index 000000000..6ce27a159 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java @@ -0,0 +1,544 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.RedeliveryBackoff; +import org.apache.pulsar.client.api.SubscriptionType; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.annotation.PulsarListenerConsumerBuilderCustomizer; +import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.config.PulsarListenerEndpoint; +import org.springframework.pulsar.config.PulsarListenerEndpointRegistry; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.AckRedeliveryBackoffAttribute.AckRedeliveryBackoffAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.AutoStartupAttribute.AutoStartupAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.ConcurrencyAttribute.ConcurrencyAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.ConsumerCustomizerAttribute.ConsumerCustomizerAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.ConsumerErrorHandlerAttribute.ConsumerErrorHandlerAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.ContainerFactoryAttribute.ContainerFactoryAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.DeadLetterPolicyAttribute.DeadLetterPolicyAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.IdAttribute.IdAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.NackRedeliveryBackoffAttribute.NackRedeliveryBackoffAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.PropertiesAttribute.PropertiesAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.SubscriptionNameAttribute.SubscriptionNameAttributeConfig; +import org.springframework.pulsar.listener.PulsarListenerSpelTests.TopicsAttribute.TopicsAttributeConfig; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * Tests {@code SpEL} functionality in {@link PulsarListener @PulsarListener} attributes. + * + * @author Chris Bono + */ +class PulsarListenerSpelTests extends PulsarListenerTestsBase { + + private static final String TOPIC = "pulsar-listener-spel-tests-topic"; + + @Nested + @ContextConfiguration(classes = IdAttributeConfig.class) + @TestPropertySource(properties = "foo.id = foo") + class IdAttribute { + + @Test + void containerIdDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).isNotNull(); + assertThat(registry.getListenerContainer("bar")).isNotNull(); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class IdAttributeConfig { + + @PulsarListener(topics = TOPIC, id = "${foo.id}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "#{T(java.lang.String).valueOf('bar')}") + void listen2(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = SubscriptionNameAttributeConfig.class) + @TestPropertySource(properties = "foo.subscriptionName = fooSub") + class SubscriptionNameAttribute { + + @Test + void subscriptionNameDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().getSubscriptionName()) + .isEqualTo("fooSub"); + assertThat(registry.getListenerContainer("bar").getContainerProperties().getSubscriptionName()) + .isEqualTo("barSub"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class SubscriptionNameAttributeConfig { + + @PulsarListener(topics = TOPIC, id = "foo", subscriptionName = "${foo.subscriptionName}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", subscriptionName = "#{T(java.lang.String).valueOf('barSub')}") + void listen2(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = TopicsAttributeConfig.class) + @TestPropertySource(properties = { "foo.topics = foo", "foo.topicPattern = foo*" }) + class TopicsAttribute { + + @Test + void topicsDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().getTopics()) + .containsExactly("foo"); + assertThat(registry.getListenerContainer("bar").getContainerProperties().getTopics()) + .containsExactly("bar"); + assertThat(registry.getListenerContainer("zaa").getContainerProperties().getTopicsPattern()) + .isEqualTo("foo*"); + assertThat(registry.getListenerContainer("laa").getContainerProperties().getTopicsPattern()) + .isEqualTo("bar*"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class TopicsAttributeConfig { + + @PulsarListener(topics = "${foo.topics}", id = "foo") + void listen1(String ignored) { + } + + @PulsarListener(topics = "#{T(java.lang.String).valueOf('bar')}", id = "bar") + void listen2(String ignored) { + } + + @PulsarListener(topicPattern = "${foo.topicPattern}", id = "zaa") + void listen3(String ignored) { + } + + @PulsarListener(topicPattern = "#{T(java.lang.String).valueOf('bar*')}", id = "laa") + void listen4(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ContainerFactoryAttributeConfig.class) + class ContainerFactoryAttribute { + + @Test + void containerFactoryDerivedFromAttribute(@Autowired PulsarListenerContainerFactory containerFactory) { + verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("foo"))); + verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("bar"))); + verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("zaa"))); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ContainerFactoryAttributeConfig { + + @Bean + @Primary + PulsarListenerContainerFactory customContainerFactory() { + var mockContainerFactory = mock(PulsarListenerContainerFactory.class); + AbstractPulsarMessageListenerContainer mockContainer = mock( + AbstractPulsarMessageListenerContainer.class); + when(mockContainerFactory.createListenerContainer(any(PulsarListenerEndpoint.class))) + .thenReturn(mockContainer); + return mockContainerFactory; + } + + @PulsarListener(topics = TOPIC, id = "foo", containerFactory = "#{@customContainerFactory}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", + containerFactory = "#{T(java.lang.String).valueOf('customContainerFactory')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", containerFactory = "customContainerFactory") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = AutoStartupAttributeConfig.class) + @TestPropertySource(properties = "foo.auto-start = true") + class AutoStartupAttribute { + + @Test + void containerAutoStartupDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").isAutoStartup()).isTrue(); + assertThat(registry.getListenerContainer("bar").isAutoStartup()).isFalse(); + assertThat(registry.getListenerContainer("zaa").isAutoStartup()).isTrue(); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class AutoStartupAttributeConfig { + + @PulsarListener(topics = TOPIC, id = "foo", autoStartup = "${foo.auto-start}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", autoStartup = "#{T(java.lang.Boolean).valueOf('false')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", autoStartup = "true") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = PropertiesAttributeConfig.class) + @TestPropertySource(properties = { "foo.mykey = subscriptionName", "foo.myvalue = fooSub" }) + class PropertiesAttribute { + + @Test + void containerPropsDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo").getContainerProperties().getPulsarConsumerProperties()) + .containsEntry("subscriptionName", "fooSub"); + assertThat(registry.getListenerContainer("bar").getContainerProperties().getPulsarConsumerProperties()) + .containsEntry("subscriptionName", "barSub"); + assertThat(registry.getListenerContainer("zaa").getContainerProperties().getPulsarConsumerProperties()) + .contains(entry("subscriptionName", "zaaSub"), entry("consumerName", "zaaConsumer")); + assertThat(registry.getListenerContainer("laa").getContainerProperties().getPulsarConsumerProperties()) + .contains(entry("subscriptionName", "laaSub"), entry("consumerName", "laaConsumer")); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class PropertiesAttributeConfig { + + @Bean + String[] stringArray() { + return new String[] { "subscriptionName=zaaSub", "consumerName=zaaConsumer" }; + } + + @Bean + List stringList() { + return List.of("subscriptionName=laaSub", "consumerName=laaConsumer"); + } + + @PulsarListener(topics = TOPIC, id = "foo", properties = { "${foo.mykey}=${foo.myvalue}" }) + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", + properties = "#{T(java.lang.String).valueOf('subscriptionName=barSub')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", properties = "#{@stringArray}") + void listen3(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "laa", properties = "#{@stringList}") + void listen4(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ConcurrencyAttributeConfig.class) + @TestPropertySource(properties = "foo.concurrency = 2") + class ConcurrencyAttribute { + + @Test + void containerAutoStartupDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).hasFieldOrPropertyWithValue("concurrency", 2); + assertThat(registry.getListenerContainer("bar")).hasFieldOrPropertyWithValue("concurrency", 3); + assertThat(registry.getListenerContainer("zaa")).hasFieldOrPropertyWithValue("concurrency", 4); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ConcurrencyAttributeConfig { + + @PulsarListener(topics = TOPIC, id = "foo", concurrency = "${foo.concurrency}", + subscriptionType = SubscriptionType.Shared) + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", concurrency = "#{T(java.lang.Integer).valueOf('3')}", + subscriptionType = SubscriptionType.Shared) + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", concurrency = "4", subscriptionType = SubscriptionType.Shared) + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = NackRedeliveryBackoffAttributeConfig.class) + class NackRedeliveryBackoffAttribute { + + @Test + void nackRedeliveryBackoffDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).extracting("negativeAckRedeliveryBackoff") + .isSameAs(NackRedeliveryBackoffAttributeConfig.CUSTOM_BACKOFF); + assertThat(registry.getListenerContainer("bar")).extracting("negativeAckRedeliveryBackoff") + .isSameAs(NackRedeliveryBackoffAttributeConfig.CUSTOM_BACKOFF); + assertThat(registry.getListenerContainer("zaa")).extracting("negativeAckRedeliveryBackoff") + .isSameAs(NackRedeliveryBackoffAttributeConfig.CUSTOM_BACKOFF); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class NackRedeliveryBackoffAttributeConfig { + + static RedeliveryBackoff CUSTOM_BACKOFF = (i) -> i; + + @Bean + RedeliveryBackoff customNackRedeliveryBackoff() { + return CUSTOM_BACKOFF; + } + + @PulsarListener(topics = TOPIC, id = "foo", + negativeAckRedeliveryBackoff = "#{@customNackRedeliveryBackoff}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", + negativeAckRedeliveryBackoff = "#{T(java.lang.String).valueOf('customNackRedeliveryBackoff')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", negativeAckRedeliveryBackoff = "customNackRedeliveryBackoff") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = AckRedeliveryBackoffAttributeConfig.class) + class AckRedeliveryBackoffAttribute { + + @Test + void ackRedeliveryBackoffDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).extracting("ackTimeoutRedeliveryBackoff") + .isSameAs(AckRedeliveryBackoffAttributeConfig.CUSTOM_BACKOFF); + assertThat(registry.getListenerContainer("bar")).extracting("ackTimeoutRedeliveryBackoff") + .isSameAs(AckRedeliveryBackoffAttributeConfig.CUSTOM_BACKOFF); + assertThat(registry.getListenerContainer("zaa")).extracting("ackTimeoutRedeliveryBackoff") + .isSameAs(AckRedeliveryBackoffAttributeConfig.CUSTOM_BACKOFF); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class AckRedeliveryBackoffAttributeConfig { + + static RedeliveryBackoff CUSTOM_BACKOFF = (i) -> i; + + @Bean + RedeliveryBackoff customAckRedeliveryBackoff() { + return CUSTOM_BACKOFF; + } + + @PulsarListener(topics = TOPIC, id = "foo", ackTimeoutRedeliveryBackoff = "#{@customAckRedeliveryBackoff}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", + ackTimeoutRedeliveryBackoff = "#{T(java.lang.String).valueOf('customAckRedeliveryBackoff')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", ackTimeoutRedeliveryBackoff = "customAckRedeliveryBackoff") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = DeadLetterPolicyAttributeConfig.class) + class DeadLetterPolicyAttribute { + + @Test + void deadLetterPolicyDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).extracting("deadLetterPolicy") + .isSameAs(DeadLetterPolicyAttributeConfig.CUSTOM_DLP); + assertThat(registry.getListenerContainer("bar")).extracting("deadLetterPolicy") + .isSameAs(DeadLetterPolicyAttributeConfig.CUSTOM_DLP); + assertThat(registry.getListenerContainer("zaa")).extracting("deadLetterPolicy") + .isSameAs(DeadLetterPolicyAttributeConfig.CUSTOM_DLP); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class DeadLetterPolicyAttributeConfig { + + static DeadLetterPolicy CUSTOM_DLP = DeadLetterPolicy.builder().deadLetterTopic("dlt").build(); + + @Bean + DeadLetterPolicy customDeadLetterPolicy() { + return CUSTOM_DLP; + } + + @PulsarListener(topics = TOPIC, id = "foo", deadLetterPolicy = "#{@customDeadLetterPolicy}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", + deadLetterPolicy = "#{T(java.lang.String).valueOf('customDeadLetterPolicy')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", deadLetterPolicy = "customDeadLetterPolicy") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ConsumerErrorHandlerAttributeConfig.class) + class ConsumerErrorHandlerAttribute { + + @Test + void consumerErrorHandlerDerivedFromAttribute(@Autowired PulsarListenerEndpointRegistry registry) { + assertThat(registry.getListenerContainer("foo")).extracting("pulsarConsumerErrorHandler") + .isSameAs(ConsumerErrorHandlerAttributeConfig.CUSTOM_ERROR_HANDLER); + assertThat(registry.getListenerContainer("bar")).extracting("pulsarConsumerErrorHandler") + .isSameAs(ConsumerErrorHandlerAttributeConfig.CUSTOM_ERROR_HANDLER); + assertThat(registry.getListenerContainer("zaa")).extracting("pulsarConsumerErrorHandler") + .isSameAs(ConsumerErrorHandlerAttributeConfig.CUSTOM_ERROR_HANDLER); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ConsumerErrorHandlerAttributeConfig { + + static PulsarConsumerErrorHandler CUSTOM_ERROR_HANDLER = mock(PulsarConsumerErrorHandler.class); + + @Bean + PulsarConsumerErrorHandler customConsumerErrorHandler() { + return CUSTOM_ERROR_HANDLER; + } + + @PulsarListener(topics = TOPIC, id = "foo", pulsarConsumerErrorHandler = "#{@customConsumerErrorHandler}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", + pulsarConsumerErrorHandler = "#{T(java.lang.String).valueOf('customConsumerErrorHandler')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", pulsarConsumerErrorHandler = "customConsumerErrorHandler") + void listen3(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ConsumerCustomizerAttributeConfig.class) + class ConsumerCustomizerAttribute { + + @Test + void consumerCustomizerDerivedFromAttribute() { + assertThat(ConsumerCustomizerAttributeConfig.CUSTOMIZED_CONTAINERS_SUBSCRIPTION_NAMES) + .containsExactlyInAnyOrder("fooSub", "barSub", "zaaSub"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ConsumerCustomizerAttributeConfig { + + static List CUSTOMIZED_CONTAINERS_SUBSCRIPTION_NAMES = new ArrayList<>(); + + @Bean + PulsarListenerConsumerBuilderCustomizer customConsumerCustomizer() { + return (builder) -> { + var conf = ReflectionTestUtils.getField(builder, "conf"); + assertThat(conf).isNotNull(); + CUSTOMIZED_CONTAINERS_SUBSCRIPTION_NAMES + .add(Objects.toString(ReflectionTestUtils.getField(conf, "subscriptionName"), "???")); + }; + } + + @PulsarListener(topics = TOPIC, id = "foo", subscriptionName = "fooSub", + consumerCustomizer = "#{@customConsumerCustomizer}") + void listen1(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "bar", subscriptionName = "barSub", + consumerCustomizer = "#{T(java.lang.String).valueOf('customConsumerCustomizer')}") + void listen2(String ignored) { + } + + @PulsarListener(topics = TOPIC, id = "zaa", subscriptionName = "zaaSub", + consumerCustomizer = "customConsumerCustomizer") + void listen3(String ignored) { + } + + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderSpelTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderSpelTests.java new file mode 100644 index 000000000..367402894 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderSpelTests.java @@ -0,0 +1,122 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarReader; +import org.springframework.pulsar.annotation.PulsarReaderReaderBuilderCustomizer; +import org.springframework.pulsar.config.PulsarReaderEndpointRegistry; +import org.springframework.pulsar.reader.PulsarReaderSpelTests.IdAttribute.IdAttributeConfig; +import org.springframework.pulsar.reader.PulsarReaderSpelTests.ReaderCustomizerAttribute.ReaderCustomizerAttributeConfig; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * Tests {@code SpEL} functionality in {@link PulsarReader @PulsarReader} attributes. + * + * @author Chris Bono + */ +class PulsarReaderSpelTests extends PulsarReaderTestsBase { + + private static final String TOPIC = "pulsar-reader-spel-tests-topic"; + + @Nested + @ContextConfiguration(classes = IdAttributeConfig.class) + @TestPropertySource(properties = "foo.id = foo") + class IdAttribute { + + @Test + void containerIdDerivedFromAttribute(@Autowired PulsarReaderEndpointRegistry registry) { + assertThat(registry.getReaderContainer("foo")).isNotNull(); + assertThat(registry.getReaderContainer("bar")).isNotNull(); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class IdAttributeConfig { + + @PulsarReader(id = "${foo.id}", topics = TOPIC, startMessageId = "earliest") + void listen1(String ignored) { + } + + @PulsarReader(id = "#{T(java.lang.String).valueOf('bar')}", topics = TOPIC, startMessageId = "earliest") + void listen2(String ignored) { + } + + } + + } + + @Nested + @ContextConfiguration(classes = ReaderCustomizerAttributeConfig.class) + class ReaderCustomizerAttribute { + + @Test + void readerCustomizerDerivedFromAttribute() { + assertThat(ReaderCustomizerAttributeConfig.CUSTOMIZED_CONTAINERS_TOPIC_NAMES) + .containsExactlyInAnyOrder("foo-topic", "bar-topic", "zaa-topic"); + } + + @EnablePulsar + @Configuration(proxyBeanMethods = false) + static class ReaderCustomizerAttributeConfig { + + static List CUSTOMIZED_CONTAINERS_TOPIC_NAMES = new ArrayList<>(); + + @SuppressWarnings("unchecked") + @Bean + PulsarReaderReaderBuilderCustomizer customReaderCustomizer() { + return (builder) -> { + var conf = ReflectionTestUtils.getField(builder, "conf"); + var topicNames = (Set) ReflectionTestUtils.getField(conf, "topicNames"); + CUSTOMIZED_CONTAINERS_TOPIC_NAMES.addAll(topicNames); + }; + } + + @PulsarReader(id = "foo", readerCustomizer = "#{@customReaderCustomizer}", topics = "foo-topic", + startMessageId = "earliest") + void listen1(String ignored) { + } + + @PulsarReader(id = "bar", readerCustomizer = "#{T(java.lang.String).valueOf('customReaderCustomizer')}", + topics = "bar-topic", startMessageId = "earliest") + void listen2(String ignored) { + } + + @PulsarReader(id = "zaa", readerCustomizer = "customReaderCustomizer", topics = "zaa-topic", + startMessageId = "earliest") + void listen3(String ignored) { + } + + } + + } + +}