Skip to content

Favor unchecked over checked exceptions when sending messages #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Spring Pulsar specific {@link NestedRuntimeException} implementation.
*
* @author Soby Chacko
* @author Jonas Geiregat
*/
public class PulsarException extends NestedRuntimeException {

Expand All @@ -33,4 +34,8 @@ public PulsarException(String msg, Throwable cause) {
super(msg, cause);
}

public PulsarException(Exception exception) {
super(exception.getMessage(), exception.getCause());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -57,14 +58,22 @@ public DefaultPulsarClientFactory(PulsarClientBuilderCustomizer customizer) {
}

@Override
public PulsarClient createClient() throws PulsarClientException {
public PulsarClient createClient() {
if (this.useRestartableClient) {
this.logger.info(() -> "Using restartable client");
return new PulsarClientProxy(this.customizer);
}
var clientBuilder = PulsarClient.builder();
this.customizer.customize(clientBuilder);
return clientBuilder.build();
try {
return clientBuilder.build();
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -42,6 +43,7 @@
* @author Alexander Preuß
* @author Christophe Bornet
* @author Chris Bono
* @author Jonas Geiregat
*/
public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T> {

Expand All @@ -64,15 +66,23 @@ public DefaultPulsarConsumerFactory(PulsarClient pulsarClient,

@Override
public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
@Nullable String subscriptionName, ConsumerBuilderCustomizer<T> customizer) throws PulsarClientException {
return createConsumer(schema, topics, subscriptionName, null,
customizer != null ? Collections.singletonList(customizer) : null);
@Nullable String subscriptionName, ConsumerBuilderCustomizer<T> customizer) {
try {
return createConsumer(schema, topics, subscriptionName, null,
customizer != null ? Collections.singletonList(customizer) : null);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

@Override
public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
@Nullable String subscriptionName, @Nullable Map<String, String> metadataProperties,
@Nullable List<ConsumerBuilderCustomizer<T>> customizers) throws PulsarClientException {
@Nullable List<ConsumerBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);

Expand All @@ -92,7 +102,15 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
return consumerBuilder.subscribe();
try {
return consumerBuilder.subscribe();
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

private void replaceTopicsOnBuilder(ConsumerBuilder<T> builder, Collection<String> topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.CollectionUtils;

/**
Expand Down Expand Up @@ -102,21 +103,45 @@ public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String
}

@Override
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic) throws PulsarClientException {
return doCreateProducer(schema, topic, null, null);
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic) {
try {
return doCreateProducer(schema, topic, null, null);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

@Override
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic,
@Nullable ProducerBuilderCustomizer<T> customizer) throws PulsarClientException {
return doCreateProducer(schema, topic, null, customizer != null ? Collections.singletonList(customizer) : null);
@Nullable ProducerBuilderCustomizer<T> customizer) {
try {
return doCreateProducer(schema, topic, null,
customizer != null ? Collections.singletonList(customizer) : null);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

@Override
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers)
throws PulsarClientException {
return doCreateProducer(schema, topic, encryptionKeys, customizers);
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
try {
return doCreateProducer(schema, topic, encryptionKeys, customizers);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.springframework.pulsar.core;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.pulsar.PulsarException;

/**
* Pulsar client factory interface.
Expand All @@ -30,8 +31,8 @@ public interface PulsarClientFactory {
/**
* Create a client.
* @return the created client instance
* @throws PulsarClientException if an error occurs creating the client
* @throws PulsarException if an error occurs creating the client
*/
PulsarClient createClient() throws PulsarClientException;
PulsarClient createClient();

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;

/**
* Pulsar consumer factory interface.
Expand All @@ -34,6 +34,7 @@
* @author Soby Chacko
* @author Christophe Bornet
* @author Chris Bono
* @author Jonas Geiregat
*/
public interface PulsarConsumerFactory<T> {

Expand All @@ -53,10 +54,10 @@ public interface PulsarConsumerFactory<T> {
* that the customizer is applied last and has the potential for overriding any
* specified parameters or default properties.
* @return the consumer
* @throws PulsarClientException if any error occurs
* @throws PulsarException if any error occurs
*/
Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics, @Nullable String subscriptionName,
ConsumerBuilderCustomizer<T> customizer) throws PulsarClientException;
ConsumerBuilderCustomizer<T> customizer);

/**
* Create a consumer.
Expand All @@ -79,10 +80,9 @@ Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics
* builder. Note that the customizers are applied last and have the potential for
* overriding any specified parameters or default properties.
* @return the consumer
* @throws PulsarClientException if any error occurs
* @throws PulsarException if any error occurs
*/
Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics, @Nullable String subscriptionName,
@Nullable Map<String, String> metadataProperties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers)
throws PulsarClientException;
@Nullable Map<String, String> metadataProperties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.pulsar.client.api.Schema;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;

/**
* The basic Pulsar operations contract.
*
* @param <T> the message payload type
* @author Chris Bono
* @author Alexander Preuß
* @author Jonas Geiregat
*/
public interface PulsarOperations<T> {

Expand All @@ -40,7 +42,7 @@ public interface PulsarOperations<T> {
* @return the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
MessageId send(@Nullable T message) throws PulsarClientException;
MessageId send(@Nullable T message) throws PulsarException;

/**
* Sends a message to the default topic in a blocking manner.
Expand All @@ -50,7 +52,7 @@ public interface PulsarOperations<T> {
* @return the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
MessageId send(@Nullable T message, @Nullable Schema<T> schema) throws PulsarClientException;
MessageId send(@Nullable T message, @Nullable Schema<T> schema) throws PulsarException;

/**
* Sends a message to the specified topic in a blocking manner.
Expand All @@ -60,7 +62,7 @@ public interface PulsarOperations<T> {
* @return the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
MessageId send(@Nullable String topic, @Nullable T message) throws PulsarClientException;
MessageId send(@Nullable String topic, @Nullable T message) throws PulsarException;

/**
* Sends a message to the specified topic in a blocking manner.
Expand All @@ -72,16 +74,15 @@ public interface PulsarOperations<T> {
* @return the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema)
throws PulsarClientException;
MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema) throws PulsarException;

/**
* Sends a message to the default topic in a non-blocking manner.
* @param message the message to send
* @return a future that holds the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
CompletableFuture<MessageId> sendAsync(@Nullable T message) throws PulsarClientException;
CompletableFuture<MessageId> sendAsync(@Nullable T message) throws PulsarException;

/**
* Sends a message to the default topic in a non-blocking manner.
Expand All @@ -91,8 +92,7 @@ MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema<T>
* @return a future that holds the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
CompletableFuture<MessageId> sendAsync(@Nullable T message, @Nullable Schema<T> schema)
throws PulsarClientException;
CompletableFuture<MessageId> sendAsync(@Nullable T message, @Nullable Schema<T> schema) throws PulsarException;

/**
* Sends a message to the specified topic in a non-blocking manner.
Expand All @@ -102,7 +102,7 @@ CompletableFuture<MessageId> sendAsync(@Nullable T message, @Nullable Schema<T>
* @return a future that holds the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
*/
CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message) throws PulsarClientException;
CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message) throws PulsarException;

/**
* Sends a message to the specified topic in a non-blocking manner.
Expand All @@ -115,7 +115,7 @@ CompletableFuture<MessageId> sendAsync(@Nullable T message, @Nullable Schema<T>
* @throws PulsarClientException if an error occurs
*/
CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema)
throws PulsarClientException;
throws PulsarException;

/**
* Create a {@link SendMessageBuilder builder} for configuring and sending a message.
Expand Down Expand Up @@ -170,17 +170,17 @@ interface SendMessageBuilder<T> {
/**
* Send the message in a blocking manner using the configured specification.
* @return the id assigned by the broker to the published message
* @throws PulsarClientException if an error occurs
* @throws PulsarException if an error occurs
*/
MessageId send() throws PulsarClientException;
MessageId send() throws PulsarException;

/**
* Uses the configured specification to send the message in a non-blocking manner.
* @return a future that holds the id assigned by the broker to the published
* message
* @throws PulsarClientException if an error occurs
* @throws PulsarException if an error occurs
*/
CompletableFuture<MessageId> sendAsync() throws PulsarClientException;
CompletableFuture<MessageId> sendAsync() throws PulsarException;

}

Expand Down
Loading