Skip to content

Commit 1e60c98

Browse files
author
Mikhail Polivakha
committed
spring-projectsGH-3732 make mqttClient initialize lazily
1 parent f3d3694 commit 1e60c98

File tree

3 files changed

+35
-31
lines changed

3 files changed

+35
-31
lines changed

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.mqtt.inbound;
1818

1919
import java.util.LinkedHashSet;
20+
import java.util.Objects;
2021
import java.util.Set;
2122
import java.util.concurrent.locks.Lock;
2223
import java.util.concurrent.locks.ReentrantLock;
@@ -140,7 +141,7 @@ protected MqttMessageConverter getConverter() {
140141
}
141142

142143
@ManagedAttribute
143-
public String[] getTopic() {
144+
public String[] getTopicNames() {
144145
this.topicLock.lock();
145146
try {
146147
String[] topicNames = new String[this.topics.size()];
@@ -331,15 +332,7 @@ public boolean equals(Object obj) {
331332
return false;
332333
}
333334
Topic other = (Topic) obj;
334-
if (this.topic == null) {
335-
if (other.topic != null) {
336-
return false;
337-
}
338-
}
339-
else if (!this.topic.equals(other.topic)) {
340-
return false;
341-
}
342-
return true;
335+
return Objects.equals(this.topic, other.topic);
343336
}
344337

345338
@Override

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -193,7 +193,7 @@ protected synchronized void doStop() {
193193
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
194194
&& this.cleanSession)) {
195195

196-
this.client.unsubscribe(getTopic());
196+
this.client.unsubscribe(getTopicNames());
197197
}
198198
}
199199
catch (MqttException ex) {
@@ -270,7 +270,7 @@ private synchronized void connectAndSubscribe() throws MqttException { // NOSONA
270270
}
271271

272272
this.topicLock.lock();
273-
String[] topics = getTopic();
273+
String[] topics = getTopicNames();
274274
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
275275
try {
276276
this.client.connect(connectionOptions);

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -141,26 +141,29 @@ public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
141141
@Override
142142
protected void onInit() {
143143
super.onInit();
144+
initializeMqttAsyncClientIfRequired();
145+
if (this.messageConverter == null) {
146+
setMessageConverter(getBeanFactory()
147+
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
148+
SmartMessageConverter.class));
149+
}
150+
}
151+
152+
private void initializeMqttAsyncClient() {
144153
try {
145154
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
146155
this.mqttClient.setCallback(this);
147156
this.mqttClient.setManualAcks(isManualAcks());
148-
}
149-
catch (MqttException ex) {
157+
} catch (MqttException ex) {
150158
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
151159
}
152-
if (this.messageConverter == null) {
153-
setMessageConverter(getBeanFactory()
154-
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
155-
SmartMessageConverter.class));
156-
}
157160
}
158161

159162
@Override
160163
protected void doStart() {
161164
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
162165
this.topicLock.lock();
163-
String[] topics = getTopic();
166+
String[] topics = getTopicNames();
164167
try {
165168
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
166169
if (topics.length > 0) {
@@ -187,33 +190,40 @@ protected void doStart() {
187190
@Override
188191
protected void doStop() {
189192
this.topicLock.lock();
190-
String[] topics = getTopic();
193+
initializeMqttAsyncClientIfRequired();
194+
String[] topics = getTopicNames();
191195
try {
192196
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
193197
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
194-
}
195-
catch (MqttException ex) {
198+
} catch (MqttException ex) {
196199
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
197-
}
198-
finally {
200+
} finally {
199201
this.topicLock.unlock();
200202
}
201203
}
202204

205+
private void initializeMqttAsyncClientIfRequired() {
206+
if (mqttClient == null) {
207+
initializeMqttAsyncClient();
208+
}
209+
}
210+
203211
@Override
204212
public void destroy() {
205213
super.destroy();
206214
try {
207-
this.mqttClient.close(true);
208-
}
209-
catch (MqttException ex) {
215+
if (mqttClient != null) {
216+
this.mqttClient.close(true);
217+
}
218+
} catch (MqttException ex) {
210219
logger.error(ex, "Failed to close 'MqttAsyncClient'");
211220
}
212221
}
213222

214223
@Override
215224
public void addTopic(String topic, int qos) {
216225
this.topicLock.lock();
226+
initializeMqttAsyncClientIfRequired();
217227
try {
218228
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
219229
super.addTopic(topic, qos);
@@ -229,6 +239,7 @@ public void addTopic(String topic, int qos) {
229239
@Override
230240
public void removeTopic(String... topic) {
231241
this.topicLock.lock();
242+
initializeMqttAsyncClient();
232243
try {
233244
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
234245
super.removeTopic(topic);

0 commit comments

Comments
 (0)