Skip to content

Commit 2e6d487

Browse files
committed
GH-3732 adjust PR after code review
1 parent 6c5b610 commit 2e6d487

File tree

3 files changed

+29
-16
lines changed

3 files changed

+29
-16
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ protected MqttMessageConverter getConverter() {
141141
}
142142

143143
@ManagedAttribute
144-
public String[] getTopicNames() {
144+
public String[] getTopic() {
145145
this.topicLock.lock();
146146
try {
147147
String[] topicNames = new String[this.topics.size()];

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ protected synchronized void doStop() {
193193
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
194194
&& this.cleanSession)) {
195195

196-
this.client.unsubscribe(getTopicNames());
196+
this.client.unsubscribe(getTopic());
197197
}
198198
}
199199
catch (MqttException ex) {
@@ -270,7 +270,7 @@ private synchronized void connectAndSubscribe() throws MqttException { // NOSONA
270270
}
271271

272272
this.topicLock.lock();
273-
String[] topics = getTopicNames();
273+
String[] topics = getTopic();
274274
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
275275
try {
276276
this.client.connect(connectionOptions);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

+26-13
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
* See {@link #setPayloadType} for more information about type conversion.
6666
*
6767
* @author Artem Bilan
68+
* @author Mikhail Polivakha
6869
*
6970
* @since 5.5.5
7071
*
@@ -154,7 +155,8 @@ private void initializeMqttAsyncClient() {
154155
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
155156
this.mqttClient.setCallback(this);
156157
this.mqttClient.setManualAcks(isManualAcks());
157-
} catch (MqttException ex) {
158+
}
159+
catch (MqttException ex) {
158160
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
159161
}
160162
}
@@ -163,7 +165,7 @@ private void initializeMqttAsyncClient() {
163165
protected void doStart() {
164166
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
165167
this.topicLock.lock();
166-
String[] topics = getTopicNames();
168+
String[] topics = getTopic();
167169
try {
168170
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
169171
if (topics.length > 0) {
@@ -190,14 +192,19 @@ protected void doStart() {
190192
@Override
191193
protected void doStop() {
192194
this.topicLock.lock();
193-
initializeMqttAsyncClientIfRequired();
194-
String[] topics = getTopicNames();
195+
String[] topics = getTopic();
195196
try {
196-
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
197-
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
198-
} catch (MqttException ex) {
197+
if (mqttClient != null && mqttClient.isConnected()) {
198+
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
199+
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
200+
} else {
201+
logger.warn("An attempt to stop mqtt client without having one initialized or connected");
202+
}
203+
}
204+
catch (MqttException ex) {
199205
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
200-
} finally {
206+
}
207+
finally {
201208
this.topicLock.unlock();
202209
}
203210
}
@@ -223,10 +230,13 @@ public void destroy() {
223230
@Override
224231
public void addTopic(String topic, int qos) {
225232
this.topicLock.lock();
226-
initializeMqttAsyncClientIfRequired();
227233
try {
228-
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
229-
super.addTopic(topic, qos);
234+
if (mqttClient != null && mqttClient.isConnected()) {
235+
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
236+
super.addTopic(topic, qos);
237+
} else {
238+
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected, the topic will not be added!", topic, qos));
239+
}
230240
}
231241
catch (MqttException ex) {
232242
throw new MessagingException("Failed to subscribe to topic " + topic, ex);
@@ -239,9 +249,12 @@ public void addTopic(String topic, int qos) {
239249
@Override
240250
public void removeTopic(String... topic) {
241251
this.topicLock.lock();
242-
initializeMqttAsyncClient();
243252
try {
244-
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
253+
if (mqttClient != null && mqttClient.isConnected()) {
254+
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
255+
} else {
256+
logger.warn(String.format("An attempt to remove topics : '%s' without having mqtt client initialized or connected", Arrays.toString(topic)));
257+
}
245258
super.removeTopic(topic);
246259
}
247260
catch (MqttException ex) {

0 commit comments

Comments
 (0)