From de11fccd21f3b4feb84136dd892c43c68c612444 Mon Sep 17 00:00:00 2001 From: JonasG Date: Sat, 20 Jan 2024 10:04:48 +0100 Subject: [PATCH 1/5] Favor unchecked over checked exceptions when sending messages --- .../pulsar/PulsarException.java | 5 +++ .../pulsar/core/PulsarTemplate.java | 41 ++++++++++--------- .../pulsar/core/PulsarTemplateTests.java | 10 +++-- .../PulsarReaderStartMessageIdTests.java | 9 +--- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java b/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java index ac671e913..5904a9f1d 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java @@ -22,6 +22,7 @@ * Spring Pulsar specific {@link NestedRuntimeException} implementation. * * @author Soby Chacko + * @author Jonas Geiregat */ public class PulsarException extends NestedRuntimeException { @@ -33,4 +34,8 @@ public PulsarException(String msg, Throwable cause) { super(msg, cause); } + public PulsarException(Exception exception) { + super(exception.getMessage(), exception.getCause()); + } + } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 963f5fd2e..3e78e9702 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -36,6 +36,7 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention; import org.springframework.pulsar.observation.PulsarMessageSenderContext; import org.springframework.pulsar.observation.PulsarTemplateObservation; @@ -53,6 +54,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Christophe Bornet + * @author Jonas Geiregat */ public class PulsarTemplate implements PulsarOperations, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton { @@ -151,46 +153,43 @@ public void afterSingletonsInstantiated() { } @Override - public MessageId send(@Nullable T message) throws PulsarClientException { + public MessageId send(@Nullable T message) { return doSend(null, message, null, null, null, null); } @Override - public MessageId send(@Nullable T message, @Nullable Schema schema) throws PulsarClientException { + public MessageId send(@Nullable T message, @Nullable Schema schema) { return doSend(null, message, schema, null, null, null); } @Override - public MessageId send(@Nullable String topic, @Nullable T message) throws PulsarClientException { + public MessageId send(@Nullable String topic, @Nullable T message) { return doSend(topic, message, null, null, null, null); } @Override - public MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarClientException { + public MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) { return doSend(topic, message, schema, null, null, null); } @Override - public CompletableFuture sendAsync(@Nullable T message) throws PulsarClientException { + public CompletableFuture sendAsync(@Nullable T message) { return doSendAsync(null, message, null, null, null, null); } @Override - public CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) - throws PulsarClientException { + public CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) { return doSendAsync(null, message, schema, null, null, null); } @Override - public CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) - throws PulsarClientException { + public CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) { return doSendAsync(topic, message, null, null, null, null); } @Override public CompletableFuture sendAsync(@Nullable String topic, @Nullable T message, - @Nullable Schema schema) throws PulsarClientException { + @Nullable Schema schema) { return doSendAsync(topic, message, schema, null, null, null); } @@ -207,21 +206,21 @@ public void setBeanName(String beanName) { private MessageId doSend(@Nullable String topic, @Nullable T message, @Nullable Schema schema, @Nullable Collection encryptionKeys, @Nullable TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, - @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { + @Nullable ProducerBuilderCustomizer producerCustomizer) { try { return doSendAsync(topic, message, schema, encryptionKeys, typedMessageBuilderCustomizer, producerCustomizer) .get(); } catch (Exception ex) { - throw PulsarClientException.unwrap(ex); + throw new PulsarException(ex); } } private CompletableFuture doSendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema schema, @Nullable Collection encryptionKeys, @Nullable TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, - @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { + @Nullable ProducerBuilderCustomizer producerCustomizer) { String defaultTopic = Objects.toString(this.producerFactory.getDefaultTopic(), null); String topicName = this.topicResolver.resolveTopic(topic, message, () -> defaultTopic).orElseThrow(); this.logger.trace(() -> "Sending msg to '%s' topic".formatted(topicName)); @@ -274,8 +273,7 @@ private Observation newObservation(PulsarMessageSenderContext senderContext) { } private Producer prepareProducerForSend(@Nullable String topic, @Nullable T message, @Nullable Schema schema, - @Nullable Collection encryptionKeys, @Nullable ProducerBuilderCustomizer producerCustomizer) - throws PulsarClientException { + @Nullable Collection encryptionKeys, @Nullable ProducerBuilderCustomizer producerCustomizer) { Schema resolvedSchema = schema == null ? this.schemaResolver.resolveSchema(message).orElseThrow() : schema; List> customizers = new ArrayList<>(); if (!CollectionUtils.isEmpty(this.interceptors)) { @@ -284,7 +282,12 @@ private Producer prepareProducerForSend(@Nullable String topic, @Nullable T m if (producerCustomizer != null) { customizers.add(producerCustomizer); } - return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers); + try { + return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers); + } + catch (PulsarClientException ex) { + throw new PulsarException(ex); + } } public static class SendMessageBuilderImpl implements SendMessageBuilder { @@ -345,13 +348,13 @@ public SendMessageBuilder withProducerCustomizer(ProducerBuilderCustomizer } @Override - public MessageId send() throws PulsarClientException { + public MessageId send() { return this.template.doSend(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer); } @Override - public CompletableFuture sendAsync() throws PulsarClientException { + public CompletableFuture sendAsync() { return this.template.doSendAsync(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer); } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java index e88520b4a..410e744e0 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java @@ -17,7 +17,7 @@ package org.springframework.pulsar.core; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; @@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; import org.springframework.util.function.ThrowingConsumer; @@ -62,6 +63,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Christophe Bornet + * @author Jonas Geiregat */ class PulsarTemplateTests implements PulsarTestContainerSupport { @@ -236,7 +238,7 @@ void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaul void sendMessageWithoutTopicFails() { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send("test-message")) + assertThatExceptionOfType(PulsarException.class).isThrownBy(() -> pulsarTemplate.send("test-message")) .withMessage("Topic must be specified when no default topic is configured"); } @@ -306,7 +308,7 @@ void sendNullWithDefaultTopicFails() { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client, "sendNullWithDefaultTopicFails"); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send(null, Schema.STRING)) + assertThatExceptionOfType(PulsarException.class).isThrownBy(() -> pulsarTemplate.send(null, Schema.STRING)) .withMessage("Topic must be specified when the message is null"); } @@ -314,7 +316,7 @@ void sendNullWithDefaultTopicFails() { void sendNullWithoutSchemaFails() { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - assertThatIllegalArgumentException() + assertThatExceptionOfType(PulsarException.class) .isThrownBy(() -> pulsarTemplate.send("sendNullWithoutSchemaFails", null, null)) .withMessage("Schema must be specified when the message is null"); } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java index 4150cf9a6..9d6bf4155 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java @@ -24,7 +24,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -42,6 +41,7 @@ * * @author Soby Chacko * @author Chris Bono + * @author Jonas Geiregat */ public class PulsarReaderStartMessageIdTests extends PulsarReaderTestsBase { @@ -152,12 +152,7 @@ void listen(Message message) { public PulsarReaderReaderBuilderCustomizer myCustomizer(PulsarTemplate pulsarTemplate) { return cb -> { for (int i = 0; i < 10; i++) { - try { - messageIds[i] = pulsarTemplate.send("with-customizer-reader-topic", "hello john doe-"); - } - catch (PulsarClientException e) { - // Ignore - } + messageIds[i] = pulsarTemplate.send("with-customizer-reader-topic", "hello john doe-"); } cb.startMessageId(messageIds[4]); // the first message read is the one // after this message id. From 611e20fc6420b5aafaf1d2fcd6334b295e403522 Mon Sep 17 00:00:00 2001 From: JonasG Date: Mon, 22 Jan 2024 21:29:30 +0100 Subject: [PATCH 2/5] fix: unwrap none PulsarExceptions --- .../pulsar/core/PulsarTemplate.java | 16 ++++++++++------ .../pulsar/core/PulsarTemplateTests.java | 8 ++++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 3e78e9702..0e31f8797 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -212,8 +212,10 @@ private MessageId doSend(@Nullable String topic, @Nullable T message, @Nullable producerCustomizer) .get(); } - catch (Exception ex) { - throw new PulsarException(ex); + catch (PulsarException ex) { + throw ex; + } catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); } } @@ -240,9 +242,9 @@ private CompletableFuture doSendAsync(@Nullable String topic, @Nullab // propagate props to message senderContext.properties().forEach(messageBuilder::property); } - catch (Exception e) { + catch (RuntimeException ex) { ProducerUtils.closeProducerAsync(producer, this.logger); - throw e; + throw ex; } return messageBuilder.sendAsync().whenComplete((msgId, ex) -> { if (ex == null) { @@ -285,8 +287,10 @@ private Producer prepareProducerForSend(@Nullable String topic, @Nullable T m try { return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers); } - catch (PulsarClientException ex) { - throw new PulsarException(ex); + catch (PulsarException ex) { + throw ex; + } catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java index 410e744e0..8b583951d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java @@ -17,7 +17,7 @@ package org.springframework.pulsar.core; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; @@ -238,7 +238,7 @@ void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaul void sendMessageWithoutTopicFails() { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - assertThatExceptionOfType(PulsarException.class).isThrownBy(() -> pulsarTemplate.send("test-message")) + assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send("test-message")) .withMessage("Topic must be specified when no default topic is configured"); } @@ -308,7 +308,7 @@ void sendNullWithDefaultTopicFails() { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client, "sendNullWithDefaultTopicFails"); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - assertThatExceptionOfType(PulsarException.class).isThrownBy(() -> pulsarTemplate.send(null, Schema.STRING)) + assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send(null, Schema.STRING)) .withMessage("Topic must be specified when the message is null"); } @@ -316,7 +316,7 @@ void sendNullWithDefaultTopicFails() { void sendNullWithoutSchemaFails() { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - assertThatExceptionOfType(PulsarException.class) + assertThatIllegalArgumentException() .isThrownBy(() -> pulsarTemplate.send("sendNullWithoutSchemaFails", null, null)) .withMessage("Schema must be specified when the message is null"); } From 88336ea0839474523ba22da29db8fec89438309c Mon Sep 17 00:00:00 2001 From: JonasG Date: Mon, 22 Jan 2024 21:37:42 +0100 Subject: [PATCH 3/5] fix: align PulsarOperations with implementing class its exception handling behavior --- .../pulsar/core/PulsarOperations.java | 25 ++++++++++--------- .../pulsar/core/PulsarTemplate.java | 8 +++++- .../PulsarDeadLetterPublishingRecoverer.java | 3 ++- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java index 636ef68e5..b782f86e1 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java @@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; /** * The basic Pulsar operations contract. @@ -40,7 +41,7 @@ public interface PulsarOperations { * @return the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - MessageId send(@Nullable T message) throws PulsarClientException; + MessageId send(@Nullable T message) throws PulsarException; /** * Sends a message to the default topic in a blocking manner. @@ -50,7 +51,7 @@ public interface PulsarOperations { * @return the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - MessageId send(@Nullable T message, @Nullable Schema schema) throws PulsarClientException; + MessageId send(@Nullable T message, @Nullable Schema schema) throws PulsarException; /** * Sends a message to the specified topic in a blocking manner. @@ -60,7 +61,7 @@ public interface PulsarOperations { * @return the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - MessageId send(@Nullable String topic, @Nullable T message) throws PulsarClientException; + MessageId send(@Nullable String topic, @Nullable T message) throws PulsarException; /** * Sends a message to the specified topic in a blocking manner. @@ -73,7 +74,7 @@ public interface PulsarOperations { * @throws PulsarClientException if an error occurs */ MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarClientException; + throws PulsarException; /** * Sends a message to the default topic in a non-blocking manner. @@ -81,7 +82,7 @@ MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema * @return a future that holds the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - CompletableFuture sendAsync(@Nullable T message) throws PulsarClientException; + CompletableFuture sendAsync(@Nullable T message) throws PulsarException; /** * Sends a message to the default topic in a non-blocking manner. @@ -92,7 +93,7 @@ MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema * @throws PulsarClientException if an error occurs */ CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) - throws PulsarClientException; + throws PulsarException; /** * Sends a message to the specified topic in a non-blocking manner. @@ -102,7 +103,7 @@ CompletableFuture sendAsync(@Nullable T message, @Nullable Schema * @return a future that holds the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) throws PulsarClientException; + CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) throws PulsarException; /** * Sends a message to the specified topic in a non-blocking manner. @@ -115,7 +116,7 @@ CompletableFuture sendAsync(@Nullable T message, @Nullable Schema * @throws PulsarClientException if an error occurs */ CompletableFuture sendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarClientException; + throws PulsarException; /** * Create a {@link SendMessageBuilder builder} for configuring and sending a message. @@ -170,17 +171,17 @@ interface SendMessageBuilder { /** * Send the message in a blocking manner using the configured specification. * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if an error occurs */ - MessageId send() throws PulsarClientException; + MessageId send() throws PulsarException; /** * Uses the configured specification to send the message in a non-blocking manner. * @return a future that holds the id assigned by the broker to the published * message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if an error occurs */ - CompletableFuture sendAsync() throws PulsarClientException; + CompletableFuture sendAsync() throws PulsarException; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 0e31f8797..fb14809c3 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -37,6 +37,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.pulsar.PulsarException; +import org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder; import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention; import org.springframework.pulsar.observation.PulsarMessageSenderContext; import org.springframework.pulsar.observation.PulsarTemplateObservation; @@ -259,10 +260,15 @@ private CompletableFuture doSendAsync(@Nullable String topic, @Nullab ProducerUtils.closeProducerAsync(producer, this.logger); }); } - catch (RuntimeException ex) { + catch (PulsarException ex) { observation.error(ex); observation.stop(); throw ex; + } catch (Exception ex) { + var pulsarException = new PulsarException(PulsarClientException.unwrap(ex)); + observation.error(pulsarException); + observation.stop(); + throw pulsarException; } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java index c892c8afa..9bd9a1025 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.core.log.LogAccessor; +import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.core.PulsarOperations; /** @@ -70,7 +71,7 @@ public PulsarMessageRecoverer recovererForConsumer(Consumer consumer) { exception.getCause() != null ? exception.getCause().getMessage() : exception.getMessage())) .sendAsync(); } - catch (PulsarClientException e) { + catch (PulsarException e) { this.logger.error(e, "DLT publishing failed."); } }; From 59df0e9cba8827524725653d5cc8dc2ed2ca1677 Mon Sep 17 00:00:00 2001 From: JonasG Date: Mon, 22 Jan 2024 22:08:25 +0100 Subject: [PATCH 4/5] feat: favor unchecked PulsarException over checked PulsarClientException --- .../core/DefaultPulsarClientFactory.java | 13 ++++++- .../core/DefaultPulsarConsumerFactory.java | 28 ++++++++++--- .../core/DefaultPulsarProducerFactory.java | 39 +++++++++++++++---- .../pulsar/core/PulsarClientFactory.java | 7 ++-- .../pulsar/core/PulsarConsumerFactory.java | 12 +++--- .../pulsar/core/PulsarOperations.java | 7 ++-- .../pulsar/core/PulsarProducerFactory.java | 15 +++---- .../pulsar/core/PulsarTemplate.java | 9 +++-- ...DefaultPulsarMessageListenerContainer.java | 5 ++- .../PulsarDeadLetterPublishingRecoverer.java | 1 - .../core/DefaultPulsarClientFactoryTests.java | 12 +++--- .../pulsar/core/PulsarTemplateTests.java | 1 - 12 files changed, 102 insertions(+), 47 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java index 6057f4358..f84d10265 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java @@ -22,6 +22,7 @@ import org.springframework.context.EnvironmentAware; import org.springframework.core.env.Environment; import org.springframework.core.log.LogAccessor; +import org.springframework.pulsar.PulsarException; import org.springframework.util.Assert; /** @@ -57,14 +58,22 @@ public DefaultPulsarClientFactory(PulsarClientBuilderCustomizer customizer) { } @Override - public PulsarClient createClient() throws PulsarClientException { + public PulsarClient createClient() { if (this.useRestartableClient) { this.logger.info(() -> "Using restartable client"); return new PulsarClientProxy(this.customizer); } var clientBuilder = PulsarClient.builder(); this.customizer.customize(clientBuilder); - return clientBuilder.build(); + try { + return clientBuilder.build(); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } @Override diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index 74374c6fd..9cbaf3b32 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; import org.springframework.util.CollectionUtils; /** @@ -42,6 +43,7 @@ * @author Alexander Preuß * @author Christophe Bornet * @author Chris Bono + * @author Jonas Geiregat */ public class DefaultPulsarConsumerFactory implements PulsarConsumerFactory { @@ -64,15 +66,23 @@ public DefaultPulsarConsumerFactory(PulsarClient pulsarClient, @Override public Consumer createConsumer(Schema schema, @Nullable Collection topics, - @Nullable String subscriptionName, ConsumerBuilderCustomizer customizer) throws PulsarClientException { - return createConsumer(schema, topics, subscriptionName, null, - customizer != null ? Collections.singletonList(customizer) : null); + @Nullable String subscriptionName, ConsumerBuilderCustomizer customizer) { + try { + return createConsumer(schema, topics, subscriptionName, null, + customizer != null ? Collections.singletonList(customizer) : null); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } @Override public Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, @Nullable Map metadataProperties, - @Nullable List> customizers) throws PulsarClientException { + @Nullable List> customizers) { Objects.requireNonNull(schema, "Schema must be specified"); ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(schema); @@ -92,7 +102,15 @@ public Consumer createConsumer(Schema schema, @Nullable Collection if (!CollectionUtils.isEmpty(customizers)) { customizers.forEach(customizer -> customizer.customize(consumerBuilder)); } - return consumerBuilder.subscribe(); + try { + return consumerBuilder.subscribe(); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } private void replaceTopicsOnBuilder(ConsumerBuilder builder, Collection topics) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java index 9ce288337..911a6d1ce 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java @@ -31,6 +31,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; import org.springframework.util.CollectionUtils; /** @@ -102,21 +103,45 @@ public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String } @Override - public Producer createProducer(Schema schema, @Nullable String topic) throws PulsarClientException { - return doCreateProducer(schema, topic, null, null); + public Producer createProducer(Schema schema, @Nullable String topic) { + try { + return doCreateProducer(schema, topic, null, null); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } @Override public Producer createProducer(Schema schema, @Nullable String topic, - @Nullable ProducerBuilderCustomizer customizer) throws PulsarClientException { - return doCreateProducer(schema, topic, null, customizer != null ? Collections.singletonList(customizer) : null); + @Nullable ProducerBuilderCustomizer customizer) { + try { + return doCreateProducer(schema, topic, null, + customizer != null ? Collections.singletonList(customizer) : null); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } @Override public Producer createProducer(Schema schema, @Nullable String topic, - @Nullable Collection encryptionKeys, @Nullable List> customizers) - throws PulsarClientException { - return doCreateProducer(schema, topic, encryptionKeys, customizers); + @Nullable Collection encryptionKeys, @Nullable List> customizers) { + try { + return doCreateProducer(schema, topic, encryptionKeys, customizers); + } + catch (PulsarException ex) { + throw ex; + } + catch (Exception ex) { + throw new PulsarException(PulsarClientException.unwrap(ex)); + } } /** diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java index 7d675bedb..3e09349de 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java @@ -17,7 +17,8 @@ package org.springframework.pulsar.core; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; + +import org.springframework.pulsar.PulsarException; /** * Pulsar client factory interface. @@ -30,8 +31,8 @@ public interface PulsarClientFactory { /** * Create a client. * @return the created client instance - * @throws PulsarClientException if an error occurs creating the client + * @throws PulsarException if an error occurs creating the client */ - PulsarClient createClient() throws PulsarClientException; + PulsarClient createClient(); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java index ec2b529f0..82ed601a0 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java @@ -22,10 +22,10 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; /** * Pulsar consumer factory interface. @@ -34,6 +34,7 @@ * @author Soby Chacko * @author Christophe Bornet * @author Chris Bono + * @author Jonas Geiregat */ public interface PulsarConsumerFactory { @@ -53,10 +54,10 @@ public interface PulsarConsumerFactory { * that the customizer is applied last and has the potential for overriding any * specified parameters or default properties. * @return the consumer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, - ConsumerBuilderCustomizer customizer) throws PulsarClientException; + ConsumerBuilderCustomizer customizer); /** * Create a consumer. @@ -79,10 +80,9 @@ Consumer createConsumer(Schema schema, @Nullable Collection topics * builder. Note that the customizers are applied last and have the potential for * overriding any specified parameters or default properties. * @return the consumer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, - @Nullable Map metadataProperties, @Nullable List> customizers) - throws PulsarClientException; + @Nullable Map metadataProperties, @Nullable List> customizers); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java index b782f86e1..4a370bfe0 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java @@ -32,6 +32,7 @@ * @param the message payload type * @author Chris Bono * @author Alexander Preuß + * @author Jonas Geiregat */ public interface PulsarOperations { @@ -73,8 +74,7 @@ public interface PulsarOperations { * @return the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) - throws PulsarException; + MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) throws PulsarException; /** * Sends a message to the default topic in a non-blocking manner. @@ -92,8 +92,7 @@ MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema * @return a future that holds the id assigned by the broker to the published message * @throws PulsarClientException if an error occurs */ - CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) - throws PulsarException; + CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) throws PulsarException; /** * Sends a message to the specified topic in a non-blocking manner. diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java index 3fb6a1ef0..9d0cd69b3 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarProducerFactory.java @@ -21,10 +21,10 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; +import org.springframework.pulsar.PulsarException; /** * The strategy to create a {@link Producer} instance(s). @@ -34,6 +34,7 @@ * @author Chris Bono * @author Alexander Preuß * @author Christophe Bornet + * @author Jonas Geiregat */ public interface PulsarProducerFactory { @@ -43,9 +44,9 @@ public interface PulsarProducerFactory { * @param topic the topic the producer will send messages to or {@code null} to use * the default topic * @return the producer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ - Producer createProducer(Schema schema, @Nullable String topic) throws PulsarClientException; + Producer createProducer(Schema schema, @Nullable String topic); /** * Create a producer. @@ -54,10 +55,10 @@ public interface PulsarProducerFactory { * the default topic * @param customizer the optional customizer to apply to the producer builder * @return the producer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ Producer createProducer(Schema schema, @Nullable String topic, - @Nullable ProducerBuilderCustomizer customizer) throws PulsarClientException; + @Nullable ProducerBuilderCustomizer customizer); /** * Create a producer. @@ -71,10 +72,10 @@ Producer createProducer(Schema schema, @Nullable String topic, * @param customizers the optional list of customizers to apply to the producer * builder * @return the producer - * @throws PulsarClientException if any error occurs + * @throws PulsarException if any error occurs */ Producer createProducer(Schema schema, @Nullable String topic, @Nullable Collection encryptionKeys, - @Nullable List> customizers) throws PulsarClientException; + @Nullable List> customizers); /** * Get the default topic to use for all created producers. diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index fb14809c3..2550903fe 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -215,7 +215,8 @@ private MessageId doSend(@Nullable String topic, @Nullable T message, @Nullable } catch (PulsarException ex) { throw ex; - } catch (Exception ex) { + } + catch (Exception ex) { throw new PulsarException(PulsarClientException.unwrap(ex)); } } @@ -264,7 +265,8 @@ private CompletableFuture doSendAsync(@Nullable String topic, @Nullab observation.error(ex); observation.stop(); throw ex; - } catch (Exception ex) { + } + catch (Exception ex) { var pulsarException = new PulsarException(PulsarClientException.unwrap(ex)); observation.error(pulsarException); observation.stop(); @@ -295,7 +297,8 @@ private Producer prepareProducerForSend(@Nullable String topic, @Nullable T m } catch (PulsarException ex) { throw ex; - } catch (Exception ex) { + } + catch (Exception ex) { throw new PulsarException(PulsarClientException.unwrap(ex)); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 3c599a82f..233a1befa 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -53,6 +53,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.core.ConsumerBuilderConfigurationUtil; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; import org.springframework.pulsar.core.PulsarConsumerFactory; @@ -291,8 +292,8 @@ else if (messageListener != null) { updateSubscriptionTypeFromConsumer(this.consumer); } } - catch (PulsarClientException e) { - DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar client exceptions."); + catch (PulsarException e) { + DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar exception."); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java index 9bd9a1025..b1ba66f4e 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarDeadLetterPublishingRecoverer.java @@ -20,7 +20,6 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.core.log.LogAccessor; import org.springframework.pulsar.PulsarException; diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java index 1a94bdac6..6e8eb934d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatRuntimeException; -import org.apache.pulsar.client.api.PulsarClientException; import org.junit.jupiter.api.Test; import org.springframework.mock.env.MockEnvironment; @@ -29,18 +28,19 @@ * Tests for {@link DefaultPulsarClientFactory}. * * @author Chris Bono + * @author Jonas Geiregat */ class DefaultPulsarClientFactoryTests { @Test - void constructWithServiceUrl() throws PulsarClientException { + void constructWithServiceUrl() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); assertThat(clientFactory.createClient()).hasFieldOrPropertyWithValue("conf.serviceUrl", "pulsar://localhost:5150"); } @Test - void constructWithCustomizer() throws PulsarClientException { + void constructWithCustomizer() { var clientFactory = new DefaultPulsarClientFactory( (clientBuilder) -> clientBuilder.serviceUrl("pulsar://localhost:5150")); assertThat(clientFactory.createClient()).hasFieldOrPropertyWithValue("conf.serviceUrl", @@ -63,14 +63,14 @@ void customizerThrowsException() { } @Test - void createsRestartableClientByDefault() throws PulsarClientException { + void createsRestartableClientByDefault() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); clientFactory.setEnvironment(new MockEnvironment()); assertThat(clientFactory.createClient()).isInstanceOf(PulsarClientProxy.class); } @Test - void createsRestartableClientWhenPropertySetTrue() throws PulsarClientException { + void createsRestartableClientWhenPropertySetTrue() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); var env = new MockEnvironment().withProperty("spring.pulsar.client.restartable", "true"); clientFactory.setEnvironment(env); @@ -78,7 +78,7 @@ void createsRestartableClientWhenPropertySetTrue() throws PulsarClientException } @Test - void createsDefaultClientWhenPropertySetFalse() throws PulsarClientException { + void createsDefaultClientWhenPropertySetFalse() { var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); var env = new MockEnvironment().withProperty("spring.pulsar.client.restartable", "false"); clientFactory.setEnvironment(env); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java index 8b583951d..67bdd6165 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java @@ -52,7 +52,6 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; import org.springframework.util.function.ThrowingConsumer; From 3329c63bcc78c70115d3d0d08075255d7d3f84bb Mon Sep 17 00:00:00 2001 From: JonasG Date: Wed, 24 Jan 2024 20:53:01 +0100 Subject: [PATCH 5/5] feat: only catch checked PulsarClientException --- .../pulsar/PulsarException.java | 4 +- .../core/CachingPulsarProducerFactory.java | 5 --- .../core/DefaultPulsarClientFactory.java | 7 +--- .../core/DefaultPulsarConsumerFactory.java | 7 +--- .../core/DefaultPulsarProducerFactory.java | 20 ++++----- .../pulsar/core/PulsarConsumerFactory.java | 7 +++- .../pulsar/core/PulsarOperations.java | 42 +++++++++++-------- .../pulsar/core/PulsarTemplate.java | 18 +------- 8 files changed, 46 insertions(+), 64 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java b/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java index 5904a9f1d..bbd28947a 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/PulsarException.java @@ -34,8 +34,8 @@ public PulsarException(String msg, Throwable cause) { super(msg, cause); } - public PulsarException(Exception exception) { - super(exception.getMessage(), exception.getCause()); + public PulsarException(Throwable cause) { + this(cause.getMessage(), cause); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java index bfd85debe..2aa9317a5 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java @@ -108,15 +108,10 @@ protected Producer doCreateProducer(Schema schema, @Nullable String topic, private Producer createCacheableProducer(Schema schema, String topic, @Nullable Collection encryptionKeys, @Nullable List> customizers) { - try { var producer = super.doCreateProducer(schema, topic, encryptionKeys, customizers); return new ProducerWithCloseCallback<>(producer, (p) -> this.logger.trace(() -> "Client closed producer %s but will skip actual closing" .formatted(ProducerUtils.formatProducer(producer)))); - } - catch (PulsarClientException ex) { - throw new RuntimeException(ex); - } } /** diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java index f84d10265..d43238b45 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java @@ -68,11 +68,8 @@ public PulsarClient createClient() { try { return clientBuilder.build(); } - catch (PulsarException ex) { - throw ex; - } - catch (Exception ex) { - throw new PulsarException(PulsarClientException.unwrap(ex)); + catch (PulsarClientException ex) { + throw new PulsarException(ex); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index 9cbaf3b32..03a3f402a 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -105,11 +105,8 @@ public Consumer createConsumer(Schema schema, @Nullable Collection try { return consumerBuilder.subscribe(); } - catch (PulsarException ex) { - throw ex; - } - catch (Exception ex) { - throw new PulsarException(PulsarClientException.unwrap(ex)); + catch (PulsarClientException ex) { + throw new PulsarException(ex); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java index 911a6d1ce..d6b1c1604 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java @@ -104,15 +104,7 @@ public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String @Override public Producer createProducer(Schema schema, @Nullable String topic) { - try { - return doCreateProducer(schema, topic, null, null); - } - catch (PulsarException ex) { - throw ex; - } - catch (Exception ex) { - throw new PulsarException(PulsarClientException.unwrap(ex)); - } + return doCreateProducer(schema, topic, null, null); } @Override @@ -159,8 +151,7 @@ public Producer createProducer(Schema schema, @Nullable String topic, * @throws PulsarClientException if any error occurs */ protected Producer doCreateProducer(Schema schema, @Nullable String topic, - @Nullable Collection encryptionKeys, @Nullable List> customizers) - throws PulsarClientException { + @Nullable Collection encryptionKeys, @Nullable List> customizers) { Objects.requireNonNull(schema, "Schema must be specified"); var resolvedTopic = resolveTopicName(topic); this.logger.trace(() -> "Creating producer for '%s' topic".formatted(resolvedTopic)); @@ -181,7 +172,12 @@ protected Producer doCreateProducer(Schema schema, @Nullable String topic, } producerBuilder.topic(resolvedTopic); - return producerBuilder.create(); + try { + return producerBuilder.create(); + } + catch (PulsarClientException ex) { + throw new PulsarException(ex); + } } protected String resolveTopicName(String userSpecifiedTopic) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java index 82ed601a0..f36d19f72 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarConsumerFactory.java @@ -22,6 +22,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; @@ -54,7 +55,8 @@ public interface PulsarConsumerFactory { * that the customizer is applied last and has the potential for overriding any * specified parameters or default properties. * @return the consumer - * @throws PulsarException if any error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, ConsumerBuilderCustomizer customizer); @@ -80,7 +82,8 @@ Consumer createConsumer(Schema schema, @Nullable Collection topics * builder. Note that the customizers are applied last and have the potential for * overriding any specified parameters or default properties. * @return the consumer - * @throws PulsarException if any error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ Consumer createConsumer(Schema schema, @Nullable Collection topics, @Nullable String subscriptionName, @Nullable Map metadataProperties, @Nullable List> customizers); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java index 4a370bfe0..a43d85ffa 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java @@ -40,9 +40,10 @@ public interface PulsarOperations { * Sends a message to the default topic in a blocking manner. * @param message the message to send * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable T message) throws PulsarException; + MessageId send(@Nullable T message); /** * Sends a message to the default topic in a blocking manner. @@ -50,9 +51,10 @@ public interface PulsarOperations { * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable T message, @Nullable Schema schema) throws PulsarException; + MessageId send(@Nullable T message, @Nullable Schema schema); /** * Sends a message to the specified topic in a blocking manner. @@ -60,9 +62,10 @@ public interface PulsarOperations { * default topic * @param message the message to send * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable String topic, @Nullable T message) throws PulsarException; + MessageId send(@Nullable String topic, @Nullable T message); /** * Sends a message to the specified topic in a blocking manner. @@ -72,17 +75,19 @@ public interface PulsarOperations { * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema) throws PulsarException; + MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema schema); /** * Sends a message to the default topic in a non-blocking manner. * @param message the message to send * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - CompletableFuture sendAsync(@Nullable T message) throws PulsarException; + CompletableFuture sendAsync(@Nullable T message); /** * Sends a message to the default topic in a non-blocking manner. @@ -90,9 +95,10 @@ public interface PulsarOperations { * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema) throws PulsarException; + CompletableFuture sendAsync(@Nullable T message, @Nullable Schema schema); /** * Sends a message to the specified topic in a non-blocking manner. @@ -100,9 +106,10 @@ public interface PulsarOperations { * default topic * @param message the message to send * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ - CompletableFuture sendAsync(@Nullable String topic, @Nullable T message) throws PulsarException; + CompletableFuture sendAsync(@Nullable String topic, @Nullable T message); /** * Sends a message to the specified topic in a non-blocking manner. @@ -112,7 +119,8 @@ public interface PulsarOperations { * @param schema the schema to use or {@code null} to send using the default schema * resolution * @return a future that holds the id assigned by the broker to the published message - * @throws PulsarClientException if an error occurs + * @throws PulsarException if any {@link PulsarClientException} occurs communicating + * with Pulsar */ CompletableFuture sendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema schema) throws PulsarException; @@ -172,7 +180,7 @@ interface SendMessageBuilder { * @return the id assigned by the broker to the published message * @throws PulsarException if an error occurs */ - MessageId send() throws PulsarException; + MessageId send(); /** * Uses the configured specification to send the message in a non-blocking manner. @@ -180,7 +188,7 @@ interface SendMessageBuilder { * message * @throws PulsarException if an error occurs */ - CompletableFuture sendAsync() throws PulsarException; + CompletableFuture sendAsync(); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 2550903fe..563748e55 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -261,17 +261,11 @@ private CompletableFuture doSendAsync(@Nullable String topic, @Nullab ProducerUtils.closeProducerAsync(producer, this.logger); }); } - catch (PulsarException ex) { + catch (RuntimeException ex) { observation.error(ex); observation.stop(); throw ex; } - catch (Exception ex) { - var pulsarException = new PulsarException(PulsarClientException.unwrap(ex)); - observation.error(pulsarException); - observation.stop(); - throw pulsarException; - } } private Observation newObservation(PulsarMessageSenderContext senderContext) { @@ -292,15 +286,7 @@ private Producer prepareProducerForSend(@Nullable String topic, @Nullable T m if (producerCustomizer != null) { customizers.add(producerCustomizer); } - try { - return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers); - } - catch (PulsarException ex) { - throw ex; - } - catch (Exception ex) { - throw new PulsarException(PulsarClientException.unwrap(ex)); - } + return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers); } public static class SendMessageBuilderImpl implements SendMessageBuilder {