Skip to content

Add Spring Pulsar transaction support #40189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.util.Assert;

/**
Expand All @@ -48,7 +48,7 @@
* @since 3.2.0
*/
@ConfigurationProperties("spring.pulsar")
public class PulsarProperties implements InitializingBean {
public class PulsarProperties {

private final Client client = new Client();

Expand Down Expand Up @@ -104,12 +104,6 @@ public Template getTemplate() {
return this.template;
}

@Override
public void afterPropertiesSet() throws Exception {
this.getTemplate().getTransaction().validate("spring.pulsar.template");
this.getListener().getTransaction().validate("spring.pulsar.listener");
}

/**
* Whether transactions are enabled for either the template or the listener.
* @return whether transactions are enabled for either the template or the listener
Expand Down Expand Up @@ -781,7 +775,7 @@ public static class Listener {
/**
* Transaction settings.
*/
private final Transaction transaction = new Transaction();
private final Transaction transaction = new ListenerTransaction();

public SchemaType getSchemaType() {
return this.schemaType;
Expand Down Expand Up @@ -885,7 +879,7 @@ public static class Template {
/**
* Transaction settings.
*/
private final Transaction transaction = new Transaction();
private final Transaction transaction = new TemplateTransaction();

public boolean isObservationsEnabled() {
return this.observationsEnabled;
Expand All @@ -901,7 +895,7 @@ public Transaction getTransaction() {

}

public static class Transaction {
public abstract static class Transaction {

/**
* Whether transactions are enabled for the component.
Expand Down Expand Up @@ -943,9 +937,39 @@ public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

void validate(String prefix) {
Assert.state((this.enabled || !this.required), "If transactions are required "
+ "they must also be enabled - consult your '%s.transaction' properties.".formatted(prefix));
void validate() {
if (this.required && !this.enabled) {
String requiredProp = "%s.required".formatted(this.propertyPath());
String enabledProp = "%s.enabled".formatted(this.propertyPath());
throw new InvalidConfigurationPropertyValueException(requiredProp, this.required,
"Transactions must be enabled in order to be required. "
+ "Either set %s to 'true' or make transactions optional by setting %s to 'false'"
.formatted(enabledProp, requiredProp));
}
}

/**
* Gets the property path that the transaction properties are mapped to.
* @return the property path that the transaction properties are mapped to
*/
protected abstract String propertyPath();

}

static class TemplateTransaction extends Transaction {

@Override
protected String propertyPath() {
return "spring.pulsar.template.transaction";
}

}

static class ListenerTransaction extends Transaction {

@Override
protected String propertyPath() {
return "spring.pulsar.listener.transaction";
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {

<T> void customizeTemplate(PulsarTemplate<T> template) {
PulsarProperties.Transaction properties = this.properties.getTemplate().getTransaction();
properties.validate();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(template.transactions()::setEnabled);
map.from(properties::isRequired).to(template.transactions()::setRequired);
Expand Down Expand Up @@ -213,6 +214,7 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie

private void customizePulsarContainerTransactionProperties(PulsarContainerProperties containerProperties) {
PulsarProperties.Transaction properties = this.properties.getListener().getTransaction();
properties.validate();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(containerProperties.transactions()::setEnabled);
map.from(properties::isRequired).to(containerProperties.transactions()::setRequired);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,10 @@ void whenTemplateRequiresTransactionsThenTransactionsMustBeEnabled() {
.run((context) -> assertThat(context).hasFailed()
.getFailure()
.hasMessageEndingWith(
"If transactions are required they must also be enabled - consult your 'spring.pulsar.template.transaction' properties."));
"Property spring.pulsar.template.transaction.required with value 'true' is invalid: "
+ "Transactions must be enabled in order to be required. Either set "
+ "spring.pulsar.template.transaction.enabled to 'true' or make transactions "
+ "optional by setting spring.pulsar.template.transaction.required to 'false'"));
}

@Test
Expand All @@ -684,7 +687,10 @@ void whenListenerRequiresTransactionsThenTransactionsMustBeEnabled() {
.run((context) -> assertThat(context).hasFailed()
.getFailure()
.hasMessageEndingWith(
"If transactions are required they must also be enabled - consult your 'spring.pulsar.listener.transaction' properties."));
"Property spring.pulsar.listener.transaction.required with value 'true' is invalid: "
+ "Transactions must be enabled in order to be required. Either set "
+ "spring.pulsar.listener.transaction.enabled to 'true' or make transactions "
+ "optional by setting spring.pulsar.listener.transaction.required to 'false'"));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
Expand All @@ -33,9 +32,6 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Defaults.SchemaInfo;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Defaults.TypeMapping;
Expand Down