Skip to content

Add Spring Pulsar transaction support #40189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.thread.Threading;
import org.springframework.boot.util.LambdaSafe;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
Expand All @@ -57,6 +59,8 @@
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.pulsar.transaction.PulsarTransactionManager;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
Expand Down Expand Up @@ -126,8 +130,11 @@ private void applyProducerBuilderCustomizers(List<ProducerBuilderCustomizer<?>>
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
PulsarTemplate<?> template = new PulsarTemplate<>(pulsarProducerFactory,
producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver,
this.properties.getTemplate().isObservationsEnabled());
this.propertiesMapper.customizeTemplate(template);
return template;
}

@Bean
Expand All @@ -142,6 +149,13 @@ DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
}

@Bean
@ConditionalOnMissingBean
@Conditional(TransactionsEnabledCondition.class)
public PulsarAwareTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
return new PulsarTransactionManager(pulsarClient);
}

@SuppressWarnings("unchecked")
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
ConsumerBuilder<?> builder) {
Expand All @@ -153,13 +167,15 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver, Environment environment) {
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
Environment environment) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
}
Expand Down Expand Up @@ -203,4 +219,26 @@ static class EnablePulsarConfiguration {

}

/**
* Custom condition that is true when transactions have been enabled in the template
* and/or in the listener.
*/
static final class TransactionsEnabledCondition extends AnyNestedCondition {

TransactionsEnabledCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnProperty(prefix = "spring.pulsar.template.transaction", name = "enabled", havingValue = "true")
static class TemplateTransactionEnabledCondition {

}

@ConditionalOnProperty(prefix = "spring.pulsar.listener.transaction", name = "enabled", havingValue = "true")
static class ListenerTransactionEnabledCondition {

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public Template getTemplate() {
return this.template;
}

/**
* Whether transactions are enabled for either the template or the listener.
* @return whether transactions are enabled for either the template or the listener
*/
public boolean isTransactionEnabled() {
return this.template.getTransaction().isEnabled() || this.listener.getTransaction().isEnabled();
}

public static class Client {

/**
Expand Down Expand Up @@ -763,6 +771,11 @@ public static class Listener {
*/
private boolean observationEnabled;

/**
* Transaction settings.
*/
private final Transaction transaction = new Transaction();

public SchemaType getSchemaType() {
return this.schemaType;
}
Expand All @@ -779,6 +792,10 @@ public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

public Transaction getTransaction() {
return this.transaction;
}

}

public static class Reader {
Expand Down Expand Up @@ -858,6 +875,11 @@ public static class Template {
*/
private boolean observationsEnabled;

/**
* Transaction settings.
*/
private final Transaction transaction = new Transaction();

public boolean isObservationsEnabled() {
return this.observationsEnabled;
}
Expand All @@ -866,6 +888,54 @@ public void setObservationsEnabled(boolean observationsEnabled) {
this.observationsEnabled = observationsEnabled;
}

public Transaction getTransaction() {
return this.transaction;
}

}

public static class Transaction {

/**
* Whether the component supports transactions.
*/
private boolean enabled;

/**
* Whether the component requires transactions.
*/
private boolean required;

/**
* Duration representing the transaction timeout - null to use default timeout of
* the underlying transaction system, or none if timeouts are not supported.
*/
private Duration timeout;

public boolean isEnabled() {
return this.enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public boolean isRequired() {
return this.required;
}

public void setRequired(boolean required) {
this.required = required;
}

public Duration getTimeout() {
return this.timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

}

public static class Authentication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.util.StringUtils;
Expand All @@ -64,6 +65,10 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
if (this.properties.getTemplate().getTransaction().isEnabled()
|| this.properties.getListener().getTransaction().isEnabled()) {
clientBuilder.enableTransaction(true);
}
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
connectionDetails);
Expand Down Expand Up @@ -157,6 +162,14 @@ <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {
map.from(properties::getAccessMode).to(producerBuilder::accessMode);
}

<T> void customizeTemplate(PulsarTemplate<T> template) {
PulsarProperties.Transaction properties = this.properties.getTemplate().getTransaction();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(template.transactions()::setEnabled);
map.from(properties::isRequired).to(template.transactions()::setRequired);
map.from(properties::getTimeout).to(template.transactions()::setTimeout);
}

<T> void customizeConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
PulsarProperties.Consumer properties = this.properties.getConsumer();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand All @@ -183,6 +196,7 @@ private void customizeConsumerBuilderSubscription(ConsumerBuilder<?> consumerBui
void customizeContainerProperties(PulsarContainerProperties containerProperties) {
customizePulsarContainerConsumerSubscriptionProperties(containerProperties);
customizePulsarContainerListenerProperties(containerProperties);
customizePulsarContainerTransactionProperties(containerProperties);
}

private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) {
Expand All @@ -198,6 +212,14 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
}

private void customizePulsarContainerTransactionProperties(PulsarContainerProperties containerProperties) {
PulsarProperties.Transaction properties = this.properties.getListener().getTransaction();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(containerProperties.transactions()::setEnabled);
map.from(properties::isRequired).to(containerProperties.transactions()::setRequired);
map.from(properties::getTimeout).to(containerProperties.transactions()::setTimeout);
}

<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
PulsarProperties.Reader properties = this.properties.getReader();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -525,6 +526,24 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUse
});
}

@Test
void whenTransactionManagerIsAvailableListenerContainerShouldUseTransactionManager() {
this.contextRunner.withPropertyValues("spring.pulsar.listener.transaction.enabled=true").run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> factory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().transactions().getTransactionManager()).isNotNull();
});
}

@Test
void whenTransactionManagerIsNotAvailableListenerContainerShouldNotUseTransactionManager() {
this.contextRunner.withPropertyValues("spring.pulsar.listener.transaction.enabled=false").run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> factory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().transactions().getTransactionManager()).isNull();
});
}

}

@Nested
Expand Down Expand Up @@ -603,4 +622,49 @@ ReaderBuilderCustomizer<?> customizerBar() {

}

@Nested
class TransactionManagerTests {

private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;

@Test
@SuppressWarnings("unchecked")
void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class);
this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr)
.run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr));
}

@Test
void whenNoPropertiesSetDoesNotAutoconfigureBean() {
this.contextRunner
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}

@Test
void whenListenerAndTemplateDisablesTransactionsDoesNotAutoconfigureBean() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.enabled=false",
"spring.pulsar.template.transaction.enabled=false")
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}

@Test
void whenListenerEnablesTransactionsAutoconfiguresBean() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.enabled=true",
"spring.pulsar.template.transaction.enabled=false")
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
}

@Test
void whenTemplateEnablesTransactionsAutoconfiguresBean() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.enabled=false",
"spring.pulsar.template.transaction.enabled=true")
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
}

}

}
Loading