Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8d88fbf

Browse files
committedMar 17, 2022
GH-3732 add unit tests for Mqttv5PahoMessageDrivenChannelAdapter
1 parent 2e6d487 commit 8d88fbf

File tree

2 files changed

+36
-20
lines changed

2 files changed

+36
-20
lines changed
 

‎spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

+12-20
Original file line numberDiff line numberDiff line change
@@ -142,25 +142,23 @@ public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
142142
@Override
143143
protected void onInit() {
144144
super.onInit();
145-
initializeMqttAsyncClientIfRequired();
145+
if (mqttClient == null) {
146+
try {
147+
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
148+
this.mqttClient.setCallback(this);
149+
this.mqttClient.setManualAcks(isManualAcks());
150+
}
151+
catch (MqttException ex) {
152+
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
153+
}
154+
}
146155
if (this.messageConverter == null) {
147156
setMessageConverter(getBeanFactory()
148157
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
149158
SmartMessageConverter.class));
150159
}
151160
}
152161

153-
private void initializeMqttAsyncClient() {
154-
try {
155-
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
156-
this.mqttClient.setCallback(this);
157-
this.mqttClient.setManualAcks(isManualAcks());
158-
}
159-
catch (MqttException ex) {
160-
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
161-
}
162-
}
163-
164162
@Override
165163
protected void doStart() {
166164
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
@@ -209,12 +207,6 @@ protected void doStop() {
209207
}
210208
}
211209

212-
private void initializeMqttAsyncClientIfRequired() {
213-
if (mqttClient == null) {
214-
initializeMqttAsyncClient();
215-
}
216-
}
217-
218210
@Override
219211
public void destroy() {
220212
super.destroy();
@@ -231,11 +223,11 @@ public void destroy() {
231223
public void addTopic(String topic, int qos) {
232224
this.topicLock.lock();
233225
try {
226+
super.addTopic(topic, qos);
234227
if (mqttClient != null && mqttClient.isConnected()) {
235228
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
236-
super.addTopic(topic, qos);
237229
} else {
238-
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected, the topic will not be added!", topic, qos));
230+
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected", topic, qos));
239231
}
240232
}
241233
catch (MqttException ex) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.springframework.integration.mqtt.inbound;
2+
3+
import org.junit.jupiter.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
6+
/**
7+
* Mqttv5PahoMessageDrivenChannelAdapterTest's unit tests
8+
*
9+
* @author Mikhail Polivakha
10+
*/
11+
class Mqttv5PahoMessageDrivenChannelAdapterTest {
12+
13+
@Test //GH-3732
14+
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() {
15+
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
16+
Assertions.assertDoesNotThrow(() -> channelAdapter.addTopic("abc", 1));
17+
}
18+
19+
@Test //GH-3732
20+
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() {
21+
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
22+
Assertions.assertDoesNotThrow(() -> channelAdapter.removeTopic("abc"));
23+
}
24+
}

0 commit comments

Comments
 (0)
Please sign in to comment.