Skip to content

Add Spring Pulsar transaction support #40189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.util.Assert;
Expand All @@ -47,7 +48,7 @@
* @since 3.2.0
*/
@ConfigurationProperties("spring.pulsar")
public class PulsarProperties {
public class PulsarProperties implements InitializingBean {

private final Client client = new Client();

Expand Down Expand Up @@ -103,11 +104,17 @@ public Template getTemplate() {
return this.template;
}

@Override
public void afterPropertiesSet() throws Exception {
this.getTemplate().getTransaction().validate("spring.pulsar.template");
this.getListener().getTransaction().validate("spring.pulsar.listener");
}

/**
* Whether transactions are enabled for either the template or the listener.
* @return whether transactions are enabled for either the template or the listener
*/
public boolean isTransactionEnabled() {
boolean isTransactionEnabled() {
return this.template.getTransaction().isEnabled() || this.listener.getTransaction().isEnabled();
}

Expand Down Expand Up @@ -897,7 +904,7 @@ public Transaction getTransaction() {
public static class Transaction {

/**
* Whether the component supports transactions.
* Whether transactions are enabled for the component.
*/
private boolean enabled;

Expand Down Expand Up @@ -936,6 +943,11 @@ public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

void validate(String prefix) {
Assert.state((this.enabled || !this.required), "If transactions are required "
+ "they must also be enabled - consult your '%s.transaction' properties.".formatted(prefix));
}

}

public static class Authentication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
if (this.properties.getTemplate().getTransaction().isEnabled()
|| this.properties.getListener().getTransaction().isEnabled()) {
if (this.properties.isTransactionEnabled()) {
clientBuilder.enableTransaction(true);
}
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,28 @@ void whenTemplateEnablesTransactionsAutoconfiguresBean() {
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
}

@Test
void whenTemplateRequiresTransactionsThenTransactionsMustBeEnabled() {
this.contextRunner
.withPropertyValues("spring.pulsar.template.transaction.required=true",
"spring.pulsar.template.transaction.enabled=false")
.run((context) -> assertThat(context).hasFailed()
.getFailure()
.hasMessageEndingWith(
"If transactions are required they must also be enabled - consult your 'spring.pulsar.template.transaction' properties."));
}

@Test
void whenListenerRequiresTransactionsThenTransactionsMustBeEnabled() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.required=true",
"spring.pulsar.listener.transaction.enabled=false")
.run((context) -> assertThat(context).hasFailed()
.getFailure()
.hasMessageEndingWith(
"If transactions are required they must also be enabled - consult your 'spring.pulsar.listener.transaction' properties."));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -415,19 +415,39 @@ void bind() {
@Nested
class TransactionProperties {

@ParameterizedTest
@MethodSource
void transactionsEnabledTests(boolean listenerEnablesTransactions, boolean templateEnablesTransactions,
boolean shouldTransactionsBeEnabled) {
@Test
void transactionsEnabledWhenListenerAndTemplateBothEnabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(true);
properties.getTemplate().getTransaction().setEnabled(true);
assertThat(properties.isTransactionEnabled()).isTrue();

}

@Test
void transactionsEnabledWhenListenerEnabledAndTemplateDisabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(true);
properties.getTemplate().getTransaction().setEnabled(false);
assertThat(properties.isTransactionEnabled()).isTrue();

}

@Test
void transactionsEnabledWhenListenerDisabledAndTemplateEnabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(listenerEnablesTransactions);
properties.getTemplate().getTransaction().setEnabled(templateEnablesTransactions);
assertThat(properties.isTransactionEnabled()).isEqualTo(shouldTransactionsBeEnabled);
properties.getListener().getTransaction().setEnabled(false);
properties.getTemplate().getTransaction().setEnabled(true);
assertThat(properties.isTransactionEnabled()).isTrue();

}

static Stream<Arguments> transactionsEnabledTests() {
return Stream.of(Arguments.arguments(true, true, true), Arguments.arguments(true, false, true),
Arguments.arguments(false, true, true), Arguments.arguments(false, false, false));
void transactionsDisabledWhenListenerAndTemplateBothDisabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(false);
properties.getTemplate().getTransaction().setEnabled(false);
assertThat(properties.isTransactionEnabled()).isFalse();

}

}
Expand Down