Skip to content

Commit 3a701d3

Browse files
Mikhail Polivakhamipo256
Mikhail Polivakha
authored andcommitted
GH-3732 add mqqtClient initialization check before adding or removing topic
1 parent f3d3694 commit 3a701d3

File tree

4 files changed

+59
-26
lines changed

4 files changed

+59
-26
lines changed

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.mqtt.inbound;
1818

1919
import java.util.LinkedHashSet;
20+
import java.util.Objects;
2021
import java.util.Set;
2122
import java.util.concurrent.locks.Lock;
2223
import java.util.concurrent.locks.ReentrantLock;
@@ -331,15 +332,7 @@ public boolean equals(Object obj) {
331332
return false;
332333
}
333334
Topic other = (Topic) obj;
334-
if (this.topic == null) {
335-
if (other.topic != null) {
336-
return false;
337-
}
338-
}
339-
else if (!this.topic.equals(other.topic)) {
340-
return false;
341-
}
342-
return true;
335+
return Objects.equals(this.topic, other.topic);
343336
}
344337

345338
@Override

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

+31-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,6 +65,7 @@
6565
* See {@link #setPayloadType} for more information about type conversion.
6666
*
6767
* @author Artem Bilan
68+
* @author Mikhail Polivakha
6869
*
6970
* @since 5.5.5
7071
*
@@ -141,13 +142,15 @@ public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
141142
@Override
142143
protected void onInit() {
143144
super.onInit();
144-
try {
145-
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
146-
this.mqttClient.setCallback(this);
147-
this.mqttClient.setManualAcks(isManualAcks());
148-
}
149-
catch (MqttException ex) {
150-
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
145+
if (mqttClient == null) {
146+
try {
147+
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
148+
this.mqttClient.setCallback(this);
149+
this.mqttClient.setManualAcks(isManualAcks());
150+
}
151+
catch (MqttException ex) {
152+
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
153+
}
151154
}
152155
if (this.messageConverter == null) {
153156
setMessageConverter(getBeanFactory()
@@ -189,8 +192,12 @@ protected void doStop() {
189192
this.topicLock.lock();
190193
String[] topics = getTopic();
191194
try {
192-
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
193-
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
195+
if (mqttClient != null && mqttClient.isConnected()) {
196+
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
197+
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
198+
} else {
199+
logger.warn("An attempt to stop mqtt client without having one initialized or connected");
200+
}
194201
}
195202
catch (MqttException ex) {
196203
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
@@ -204,9 +211,10 @@ protected void doStop() {
204211
public void destroy() {
205212
super.destroy();
206213
try {
207-
this.mqttClient.close(true);
208-
}
209-
catch (MqttException ex) {
214+
if (mqttClient != null) {
215+
this.mqttClient.close(true);
216+
}
217+
} catch (MqttException ex) {
210218
logger.error(ex, "Failed to close 'MqttAsyncClient'");
211219
}
212220
}
@@ -215,8 +223,12 @@ public void destroy() {
215223
public void addTopic(String topic, int qos) {
216224
this.topicLock.lock();
217225
try {
218-
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
219226
super.addTopic(topic, qos);
227+
if (mqttClient != null && mqttClient.isConnected()) {
228+
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
229+
} else {
230+
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected", topic, qos));
231+
}
220232
}
221233
catch (MqttException ex) {
222234
throw new MessagingException("Failed to subscribe to topic " + topic, ex);
@@ -230,7 +242,11 @@ public void addTopic(String topic, int qos) {
230242
public void removeTopic(String... topic) {
231243
this.topicLock.lock();
232244
try {
233-
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
245+
if (mqttClient != null && mqttClient.isConnected()) {
246+
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
247+
} else {
248+
logger.warn(String.format("An attempt to remove topics : '%s' without having mqtt client initialized or connected", Arrays.toString(topic)));
249+
}
234250
super.removeTopic(topic);
235251
}
236252
catch (MqttException ex) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.springframework.integration.mqtt.inbound;
2+
3+
import org.junit.jupiter.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
6+
/**
7+
* Mqttv5PahoMessageDrivenChannelAdapterTest's unit tests
8+
*
9+
* @author Mikhail Polivakha
10+
*/
11+
class Mqttv5PahoMessageDrivenChannelAdapterTest {
12+
13+
@Test //GH-3732
14+
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() {
15+
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
16+
Assertions.assertDoesNotThrow(() -> channelAdapter.addTopic("abc", 1));
17+
}
18+
19+
@Test //GH-3732
20+
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() {
21+
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
22+
Assertions.assertDoesNotThrow(() -> channelAdapter.removeTopic("abc"));
23+
}
24+
}

0 commit comments

Comments
 (0)