diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java b/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java index ac671e913..bbd28947a 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java @@ -22,6 +22,7 @@ * Spring Pulsar specific {@link NestedRuntimeException} implementation. * * @author Soby Chacko + * @author Jonas Geiregat */ public class PulsarException extends NestedRuntimeException { @@ -33,4 +34,8 @@ public PulsarException(String msg, Throwable cause) { super(msg, cause); } + public PulsarException(Throwable cause) { + this(cause.getMessage(), cause); + } + } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java index bfd85debe..2aa9317a5 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java @@ -108,15 +108,10 @@ protected Producer doCreateProducer(Schema schema, @Nullable String topic, private Producer createCacheableProducer(Schema schema, String topic, @Nullable Collection encryptionKeys, @Nullable List> customizers) { - try { var producer = super.doCreateProducer(schema, topic, encryptionKeys, customizers); return new ProducerWithCloseCallback<>(producer, (p) -> this.logger.trace(() -> "Client closed producer %s but will skip actual closing" .formatted(ProducerUtils.formatProducer(producer)))); - } - catch (PulsarClientException ex) { - throw new RuntimeException(ex); - } } /** diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java index 6057f4358..d43238b45 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java @@ -22,6 +22,7 @@ import org.springframework.context.EnvironmentAware; import org.springframework.core.env.Environment; import org.springframework.core.log.LogAccessor; +import org.springframework.pulsar.PulsarException; import org.springframework.util.Assert; /** @@ -57,14 +58,19 @@ public DefaultPulsarClientFactory(PulsarClientBuilderCustomizer customizer) { } @Override - public PulsarClient createClient() throws PulsarClientException { + public PulsarClient createClient() { if (this.useRestartableClient) { this.logger.info(() -> "Using restartable client"); return new PulsarClientProxy(this.customizer); } var clientBuilder = PulsarClient.builder(); this.customizer.customize(clientBuilder); - return clientBuilder.build(); + try { + return clientBuilder.build(); + } + catch (PulsarClientException ex) { + throw new PulsarException(ex); + } } @Override diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index 74374c6fd..03a3f402a 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; import org.springframework.util.CollectionUtils; /** @@ -42,6 +43,7 @@ * @author Alexander Preuß * @author Christophe Bornet * @author Chris Bono + * @author Jonas Geiregat */ public class DefaultPulsarConsumerFactory implements PulsarConsumerFactory { @@ -64,15 +66,23 @@ public DefaultPulsarConsumerFactory(PulsarClient pulsarClient, @Override public Consumer createConsumer(Schema schema, @Nullable Collection topics, - @Nullable String subscriptionName, ConsumerBuilderCustomizer customizer) throws PulsarClientException { - return createConsumer(schema, topics, subscriptionName, null, - customizer != null ? Collections.singletonList(customizer) : null); + @Nullable String subscriptionName, ConsumerBuilderCustomizer customizer) { + try { + return createConsumer(schema, topics, subscriptionName, null, + customizer != null ? Collections.singletonList(customizer) : null); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } @Override public Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, @Nullable Map metadataProperties, - @Nullable List> customizers) throws PulsarClientException { + @Nullable List> customizers) { Objects.requireNonNull(schema, "Schema must be specified"); ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(schema); @@ -92,7 +102,12 @@ public Consumer createConsumer(Schema schema, @Nullable Collection if (!CollectionUtils.isEmpty(customizers)) { customizers.forEach(customizer -> customizer.customize(consumerBuilder)); } - return consumerBuilder.subscribe(); + try { + return consumerBuilder.subscribe(); + } + catch (PulsarClientException ex) { + throw new PulsarException(ex); + } } private void replaceTopicsOnBuilder(ConsumerBuilder builder, Collection topics) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java index 9ce288337..d6b1c1604 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java @@ -31,6 +31,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; import org.springframework.util.CollectionUtils; /** @@ -102,21 +103,37 @@ public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String } @Override - public Producer createProducer(Schema schema, @Nullable String topic) throws PulsarClientException { + public Producer createProducer(Schema schema, @Nullable String topic) { return doCreateProducer(schema, topic, null, null); } @Override public Producer createProducer(Schema schema, @Nullable String topic, - @Nullable ProducerBuilderCustomizer customizer) throws PulsarClientException { - return doCreateProducer(schema, topic, null, customizer != null ? Collections.singletonList(customizer) : null); + @Nullable ProducerBuilderCustomizer customizer) { + try { + return doCreateProducer(schema, topic, null, + customizer != null ? Collections.singletonList(customizer) : null); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } @Override public Producer createProducer(Schema schema, @Nullable String topic, - @Nullable Collection encryptionKeys, @Nullable List> customizers) - throws PulsarClientException { - return doCreateProducer(schema, topic, encryptionKeys, customizers); + @Nullable Collection encryptionKeys, @Nullable List> customizers) { + try { + return doCreateProducer(schema, topic, encryptionKeys, customizers); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } /** @@ -134,8 +151,7 @@ public Producer createProducer(Schema schema, @Nullable String topic, * @throws PulsarClientException if any error occurs */ protected Producer doCreateProducer(Schema schema, @Nullable String topic, - @Nullable Collection encryptionKeys, @Nullable List> customizers) - throws PulsarClientException { + @Nullable Collection encryptionKeys, @Nullable List> customizers) { Objects.requireNonNull(schema, "Schema must be specified"); var resolvedTopic = resolveTopicName(topic); this.logger.trace(() -> "Creating producer for '%s' topic".formatted(resolvedTopic)); @@ -156,7 +172,12 @@ protected Producer doCreateProducer(Schema schema, @Nullable String topic, } producerBuilder.topic(resolvedTopic); - return producerBuilder.create(); + try { + return producerBuilder.create(); + } + catch (PulsarClientException ex) { + throw new PulsarException(ex); + } } protected String resolveTopicName(String userSpecifiedTopic) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java index 7d675bedb..3e09349de 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java @@ -17,7 +17,8 @@ package org.springframework.pulsar.core; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; + +import org.springframework.pulsar.PulsarException; /** * Pulsar client factory interface. @@ -30,8 +31,8 @@ public interface PulsarClientFactory { /** * Create a client. * @return the created client instance - * @throws PulsarClientException if an error occurs creating the client + * @throws PulsarException if an error occurs creating the client */ - PulsarClient createClient() throws PulsarClientException; + PulsarClient createClient(); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java index ec2b529f0..f36d19f72 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; /** * Pulsar consumer factory interface. @@ -34,6 +35,7 @@ * @author Soby Chacko * @author Christophe Bornet * @author Chris Bono + * @author Jonas Geiregat */ public interface PulsarConsumerFactory { @@ -53,10 +55,11 @@ public interface PulsarConsumerFactory { * that the customizer is applied last and has the potential for overriding any * specified parameters or default properties. * @return the consumer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, - ConsumerBuilderCustomizer customizer) throws PulsarClientException; + ConsumerBuilderCustomizer customizer); /** * Create a consumer. @@ -79,10 +82,10 @@ Consumer createConsumer(Schema schema, @Nullable Collection topics * builder. Note that the customizers are applied last and have the potential for * overriding any specified parameters or default properties. * @return the consumer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, - @Nullable Map metadataProperties, @Nullable List> customizers) - throws PulsarClientException; + @Nullable Map metadataProperties, @Nullable List> customizers); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java index 636ef68e5..a43d85ffa 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java @@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; /** * The basic Pulsar operations contract. @@ -31,6 +32,7 @@ * @param the message payload type * @author Chris Bono * @author Alexander Preuß + * @author Jonas Geiregat */ public interface PulsarOperations { @@ -38,9 +40,10 @@ public interface PulsarOperations { * Sends a message to the default topic in a blocking manner. * @param message the message to send * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable T message) throws PulsarClientException; + MessageId send(@Nullable T message); /** * Sends a message to the default topic in a blocking manner. @@ -48,9 +51,10 @@ public interface PulsarOperations { * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable T message, @Nullable Schema schema) throws PulsarClientException; + MessageId send(@Nullable T message, @Nullable Schema schema); /** * Sends a message to the specified topic in a blocking manner. @@ -58,9 +62,10 @@ public interface PulsarOperations { * default topic * @param message the message to send * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable String topic, @Nullable T message) throws PulsarClientException; + MessageId send(@Nullable String topic, @Nullable T message); /** * Sends a message to the specified topic in a blocking manner. @@ -70,18 +75,19 @@ public interface PulsarOperations { * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarClientException; + MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema); /** * Sends a message to the default topic in a non-blocking manner. * @param message the message to send * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - CompletableFuture sendAsync(@Nullable T message) throws PulsarClientException; + CompletableFuture sendAsync(@Nullable T message); /** * Sends a message to the default topic in a non-blocking manner. @@ -89,10 +95,10 @@ MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) - throws PulsarClientException; + CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema); /** * Sends a message to the specified topic in a non-blocking manner. @@ -100,9 +106,10 @@ CompletableFuture sendAsync(@Nullable T message, @Nullable Schema * default topic * @param message the message to send * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) throws PulsarClientException; + CompletableFuture sendAsync(@Nullable String topic, @Nullable T message); /** * Sends a message to the specified topic in a non-blocking manner. @@ -112,10 +119,11 @@ CompletableFuture sendAsync(@Nullable T message, @Nullable Schema * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ CompletableFuture sendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarClientException; + throws PulsarException; /** * Create a {@link SendMessageBuilder builder} for configuring and sending a message. @@ -170,17 +178,17 @@ interface SendMessageBuilder { /** * Send the message in a blocking manner using the configured specification. * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if an error occurs */ - MessageId send() throws PulsarClientException; + MessageId send(); /** * Uses the configured specification to send the message in a non-blocking manner. * @return a future that holds the id assigned by the broker to the published * message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if an error occurs */ - CompletableFuture sendAsync() throws PulsarClientException; + CompletableFuture sendAsync(); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java index 3fb6a1ef0..9d0cd69b3 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java @@ -21,10 +21,10 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; /** * The strategy to create a {@link Producer} instance(s). @@ -34,6 +34,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Christophe Bornet + * @author Jonas Geiregat */ public interface PulsarProducerFactory { @@ -43,9 +44,9 @@ public interface PulsarProducerFactory { * @param topic the topic the producer will send messages to or {@code null} to use * the default topic * @return the producer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ - Producer createProducer(Schema schema, @Nullable String topic) throws PulsarClientException; + Producer createProducer(Schema schema, @Nullable String topic); /** * Create a producer. @@ -54,10 +55,10 @@ public interface PulsarProducerFactory { * the default topic * @param customizer the optional customizer to apply to the producer builder * @return the producer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ Producer createProducer(Schema schema, @Nullable String topic, - @Nullable ProducerBuilderCustomizer customizer) throws PulsarClientException; + @Nullable ProducerBuilderCustomizer customizer); /** * Create a producer. @@ -71,10 +72,10 @@ Producer createProducer(Schema schema, @Nullable String topic, * @param customizers the optional list of customizers to apply to the producer * builder * @return the producer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ Producer createProducer(Schema schema, @Nullable String topic, @Nullable Collection encryptionKeys, - @Nullable List> customizers) throws PulsarClientException; + @Nullable List> customizers); /** * Get the default topic to use for all created producers. diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 963f5fd2e..563748e55 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -36,6 +36,8 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; +import org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder; import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention; import org.springframework.pulsar.observation.PulsarMessageSenderContext; import org.springframework.pulsar.observation.PulsarTemplateObservation; @@ -53,6 +55,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Christophe Bornet + * @author Jonas Geiregat */ public class PulsarTemplate implements PulsarOperations, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton { @@ -151,46 +154,43 @@ public void afterSingletonsInstantiated() { } @Override - public MessageId send(@Nullable T message) throws PulsarClientException { + public MessageId send(@Nullable T message) { return doSend(null, message, null, null, null, null); } @Override - public MessageId send(@Nullable T message, @Nullable Schema schema) throws PulsarClientException { + public MessageId send(@Nullable T message, @Nullable Schema schema) { return doSend(null, message, schema, null, null, null); } @Override - public MessageId send(@Nullable String topic, @Nullable T message) throws PulsarClientException { + public MessageId send(@Nullable String topic, @Nullable T message) { return doSend(topic, message, null, null, null, null); } @Override - public MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarClientException { + public MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) { return doSend(topic, message, schema, null, null, null); } @Override - public CompletableFuture sendAsync(@Nullable T message) throws PulsarClientException { + public CompletableFuture sendAsync(@Nullable T message) { return doSendAsync(null, message, null, null, null, null); } @Override - public CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) - throws PulsarClientException { + public CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) { return doSendAsync(null, message, schema, null, null, null); } @Override - public CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) - throws PulsarClientException { + public CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) { return doSendAsync(topic, message, null, null, null, null); } @Override public CompletableFuture sendAsync(@Nullable String topic, @Nullable T message, - @Nullable Schema schema) throws PulsarClientException { + @Nullable Schema schema) { return doSendAsync(topic, message, schema, null, null, null); } @@ -207,21 +207,24 @@ public void setBeanName(String beanName) { private MessageId doSend(@Nullable String topic, @Nullable T message, @Nullable Schema schema, @Nullable Collection encryptionKeys, @Nullable TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, - @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { + @Nullable ProducerBuilderCustomizer producerCustomizer) { try { return doSendAsync(topic, message, schema, encryptionKeys, typedMessageBuilderCustomizer, producerCustomizer) .get(); } + catch (PulsarException ex) { + throw ex; + } catch (Exception ex) { - throw PulsarClientException.unwrap(ex); + throw new PulsarException(PulsarClientException.unwrap(ex)); } } private CompletableFuture doSendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema schema, @Nullable Collection encryptionKeys, @Nullable TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, - @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { + @Nullable ProducerBuilderCustomizer producerCustomizer) { String defaultTopic = Objects.toString(this.producerFactory.getDefaultTopic(), null); String topicName = this.topicResolver.resolveTopic(topic, message, () -> defaultTopic).orElseThrow(); this.logger.trace(() -> "Sending msg to '%s' topic".formatted(topicName)); @@ -241,9 +244,9 @@ private CompletableFuture doSendAsync(@Nullable String topic, @Nullab // propagate props to message senderContext.properties().forEach(messageBuilder::property); } - catch (Exception e) { + catch (RuntimeException ex) { ProducerUtils.closeProducerAsync(producer, this.logger); - throw e; + throw ex; } return messageBuilder.sendAsync().whenComplete((msgId, ex) -> { if (ex == null) { @@ -274,8 +277,7 @@ private Observation newObservation(PulsarMessageSenderContext senderContext) { } private Producer prepareProducerForSend(@Nullable String topic, @Nullable T message, @Nullable Schema schema, - @Nullable Collection encryptionKeys, @Nullable ProducerBuilderCustomizer producerCustomizer) - throws PulsarClientException { + @Nullable Collection encryptionKeys, @Nullable ProducerBuilderCustomizer producerCustomizer) { Schema resolvedSchema = schema == null ? this.schemaResolver.resolveSchema(message).orElseThrow() : schema; List> customizers = new ArrayList<>(); if (!CollectionUtils.isEmpty(this.interceptors)) { @@ -345,13 +347,13 @@ public SendMessageBuilder withProducerCustomizer(ProducerBuilderCustomizer } @Override - public MessageId send() throws PulsarClientException { + public MessageId send() { return this.template.doSend(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer); } @Override - public CompletableFuture sendAsync() throws PulsarClientException { + public CompletableFuture sendAsync() { return this.template.doSendAsync(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 3c599a82f..233a1befa 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -53,6 +53,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.core.ConsumerBuilderConfigurationUtil; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; import org.springframework.pulsar.core.PulsarConsumerFactory; @@ -291,8 +292,8 @@ else if (messageListener != null) { updateSubscriptionTypeFromConsumer(this.consumer); } } - catch (PulsarClientException e) { - DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar client exceptions."); + catch (PulsarException e) { + DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar exception."); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java index c892c8afa..b1ba66f4e 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java @@ -20,9 +20,9 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.core.log.LogAccessor; +import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.core.PulsarOperations; /** @@ -70,7 +70,7 @@ public PulsarMessageRecoverer recovererForConsumer(Consumer consumer) { exception.getCause() != null ? exception.getCause().getMessage() : exception.getMessage())) .sendAsync(); } - catch (PulsarClientException e) { + catch (PulsarException e) { this.logger.error(e, "DLT publishing failed."); } }; diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java index 1a94bdac6..6e8eb934d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatRuntimeException; -import org.apache.pulsar.client.api.PulsarClientException; import org.junit.jupiter.api.Test; import org.springframework.mock.env.MockEnvironment; @@ -29,18 +28,19 @@ * Tests for {@link DefaultPulsarClientFactory}. * * @author Chris Bono + * @author Jonas Geiregat */ class DefaultPulsarClientFactoryTests { @Test - void constructWithServiceUrl() throws PulsarClientException { + void constructWithServiceUrl() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); assertThat(clientFactory.createClient()).hasFieldOrPropertyWithValue("conf.serviceUrl", "pulsar://localhost:5150"); } @Test - void constructWithCustomizer() throws PulsarClientException { + void constructWithCustomizer() { var clientFactory = new DefaultPulsarClientFactory( (clientBuilder) -> clientBuilder.serviceUrl("pulsar://localhost:5150")); assertThat(clientFactory.createClient()).hasFieldOrPropertyWithValue("conf.serviceUrl", @@ -63,14 +63,14 @@ void customizerThrowsException() { } @Test - void createsRestartableClientByDefault() throws PulsarClientException { + void createsRestartableClientByDefault() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); clientFactory.setEnvironment(new MockEnvironment()); assertThat(clientFactory.createClient()).isInstanceOf(PulsarClientProxy.class); } @Test - void createsRestartableClientWhenPropertySetTrue() throws PulsarClientException { + void createsRestartableClientWhenPropertySetTrue() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); var env = new MockEnvironment().withProperty("spring.pulsar.client.restartable", "true"); clientFactory.setEnvironment(env); @@ -78,7 +78,7 @@ void createsRestartableClientWhenPropertySetTrue() throws PulsarClientException } @Test - void createsDefaultClientWhenPropertySetFalse() throws PulsarClientException { + void createsDefaultClientWhenPropertySetFalse() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); var env = new MockEnvironment().withProperty("spring.pulsar.client.restartable", "false"); clientFactory.setEnvironment(env); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java index e88520b4a..67bdd6165 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java @@ -62,6 +62,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Christophe Bornet + * @author Jonas Geiregat */ class PulsarTemplateTests implements PulsarTestContainerSupport { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java index 4150cf9a6..9d6bf4155 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java @@ -24,7 +24,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -42,6 +41,7 @@ * * @author Soby Chacko * @author Chris Bono + * @author Jonas Geiregat */ public class PulsarReaderStartMessageIdTests extends PulsarReaderTestsBase { @@ -152,12 +152,7 @@ void listen(Message message) { public PulsarReaderReaderBuilderCustomizer myCustomizer(PulsarTemplate pulsarTemplate) { return cb -> { for (int i = 0; i < 10; i++) { - try { - messageIds[i] = pulsarTemplate.send("with-customizer-reader-topic", "hello john doe-"); - } - catch (PulsarClientException e) { - // Ignore - } + messageIds[i] = pulsarTemplate.send("with-customizer-reader-topic", "hello john doe-"); } cb.startMessageId(messageIds[4]); // the first message read is the one // after this message id.