Skip to content

Commit d338577

Browse files
committed
spring-projectsGH-3685: Share MQTT connection across components
Fixes spring-projects#3685 Add new tests with reconnect cases. Other code improvements after the code review: * Adjust javadocs according to standards * Remove `setClientManager` and use exclusive ctors * Make automatic reconnects using the v3 client instead of manually using task scheduler
1 parent 9f47cc8 commit d338577

12 files changed

+376
-156
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@
2121

2222
import org.springframework.util.Assert;
2323

24+
/**
25+
* @param <T> MQTT client type
26+
*
27+
* @author Artem Vozhdayenko
28+
*
29+
* @since 6.0
30+
*/
2431
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> {
2532

2633
protected final Log logger = LogFactory.getLog(this.getClass());

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@
1818

1919
import org.springframework.context.SmartLifecycle;
2020

21+
/**
22+
* A utility abstraction over MQTT client which can be used in any MQTT-related component
23+
* without need to handle generic client callbacks, reconnects etc.
24+
* Using this manager in multiple MQTT integrations will preserve a single connection.
25+
*
26+
* @param <T> MQTT client type
27+
*
28+
* @author Artem Vozhdayenko
29+
*
30+
* @since 6.0
31+
*/
2132
public interface ClientManager<T> extends SmartLifecycle {
2233

2334
T getClient();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

+31-78
Original file line numberDiff line numberDiff line change
@@ -16,79 +16,59 @@
1616

1717
package org.springframework.integration.mqtt.core;
1818

19-
import java.time.Instant;
20-
import java.util.concurrent.ScheduledFuture;
21-
2219
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2320
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
2421
import org.eclipse.paho.client.mqttv3.MqttCallback;
2522
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2623
import org.eclipse.paho.client.mqttv3.MqttException;
2724
import org.eclipse.paho.client.mqttv3.MqttMessage;
2825

29-
import org.springframework.beans.factory.BeanFactory;
30-
import org.springframework.beans.factory.BeanFactoryAware;
31-
import org.springframework.beans.factory.InitializingBean;
32-
import org.springframework.integration.context.IntegrationContextUtils;
33-
import org.springframework.scheduling.TaskScheduler;
3426
import org.springframework.util.Assert;
3527

36-
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient>
37-
implements MqttCallback, InitializingBean, BeanFactoryAware {
38-
39-
/**
40-
* The default reconnect timeout in millis.
41-
*/
42-
private static final long DEFAULT_RECOVERY_INTERVAL = 10_000;
28+
/**
29+
* @author Artem Vozhdayenko
30+
* @since 6.0
31+
*/
32+
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
4333

4434
private final MqttPahoClientFactory clientFactory;
4535

46-
private BeanFactory beanFactory;
47-
48-
private TaskScheduler taskScheduler;
49-
50-
private volatile ScheduledFuture<?> scheduledReconnect;
51-
5236
private volatile IMqttAsyncClient client;
5337

54-
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
55-
5638
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
5739
super(clientId);
5840
Assert.notNull(clientFactory, "'clientFactory' is required");
5941
this.clientFactory = clientFactory;
60-
String[] serverURIs = clientFactory.getConnectionOptions().getServerURIs();
42+
MqttConnectOptions connectionOptions = clientFactory.getConnectionOptions();
43+
String[] serverURIs = connectionOptions.getServerURIs();
6144
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'");
6245
setUrl(serverURIs[0]);
46+
if (!connectionOptions.isAutomaticReconnect()) {
47+
logger.info("If this `ClientManager` is used from message-driven channel adapters, " +
48+
"it is recommended to set 'automaticReconnect' MQTT connection option. " +
49+
"Otherwise connection check and reconnect should be done manually.");
50+
}
6351
}
6452

6553
public Mqttv3ClientManager(String url, String clientId) {
66-
super(clientId);
54+
this(buildDefaultClientFactory(url), clientId);
55+
}
56+
57+
private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
6758
Assert.notNull(url, "'url' is required");
68-
setUrl(url);
6959
MqttConnectOptions connectOptions = new MqttConnectOptions();
7060
connectOptions.setServerURIs(new String[]{ url });
61+
connectOptions.setAutomaticReconnect(true);
7162
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory();
7263
defaultFactory.setConnectionOptions(connectOptions);
73-
this.clientFactory = defaultFactory;
64+
return defaultFactory;
7465
}
7566

7667
@Override
7768
public IMqttAsyncClient getClient() {
7869
return this.client;
7970
}
8071

81-
@Override
82-
public void afterPropertiesSet() {
83-
this.taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory);
84-
}
85-
86-
@Override
87-
public void setBeanFactory(BeanFactory beanFactory) {
88-
Assert.notNull(beanFactory, "'beanFactory' must not be null");
89-
this.beanFactory = beanFactory;
90-
}
91-
9272
@Override
9373
public synchronized void start() {
9474
if (this.client == null) {
@@ -102,12 +82,20 @@ public synchronized void start() {
10282
}
10383
}
10484
try {
105-
connect();
85+
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
86+
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
10687
}
10788
catch (MqttException e) {
108-
logger.error("could not start client manager, scheduling reconnect, client_id=" +
109-
this.client.getClientId(), e);
110-
scheduleReconnect();
89+
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);
90+
91+
if (this.clientFactory.getConnectionOptions().isAutomaticReconnect()) {
92+
try {
93+
this.client.reconnect();
94+
}
95+
catch (MqttException ex) {
96+
logger.error("MQTT client failed to re-connect.", ex);
97+
}
98+
}
11199
}
112100
}
113101

@@ -140,9 +128,7 @@ public synchronized boolean isRunning() {
140128

141129
@Override
142130
public synchronized void connectionLost(Throwable cause) {
143-
logger.error("connection lost, scheduling reconnect, client_id=" + this.client.getClientId(),
144-
cause);
145-
scheduleReconnect();
131+
logger.error("connection lost, client_id=" + this.client.getClientId(), cause);
146132
}
147133

148134
@Override
@@ -155,37 +141,4 @@ public void deliveryComplete(IMqttDeliveryToken token) {
155141
// nor this manager concern
156142
}
157143

158-
public long getRecoveryInterval() {
159-
return this.recoveryInterval;
160-
}
161-
162-
public void setRecoveryInterval(long recoveryInterval) {
163-
this.recoveryInterval = recoveryInterval;
164-
}
165-
166-
private synchronized void connect() throws MqttException {
167-
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
168-
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
169-
}
170-
171-
private synchronized void scheduleReconnect() {
172-
if (this.scheduledReconnect != null) {
173-
this.scheduledReconnect.cancel(false);
174-
}
175-
this.scheduledReconnect = this.taskScheduler.schedule(() -> {
176-
try {
177-
if (this.client.isConnected()) {
178-
return;
179-
}
180-
181-
connect();
182-
this.scheduledReconnect = null;
183-
}
184-
catch (MqttException e) {
185-
logger.error("could not reconnect", e);
186-
scheduleReconnect();
187-
}
188-
}, Instant.now().plusMillis(getRecoveryInterval()));
189-
}
190-
191144
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

+27-10
Original file line numberDiff line numberDiff line change
@@ -28,33 +28,41 @@
2828

2929
import org.springframework.util.Assert;
3030

31+
/**
32+
* @author Artem Vozhdayenko
33+
* @since 6.0
34+
*/
3135
public class Mqttv5ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
3236

3337
private final MqttConnectionOptions connectionOptions;
3438

3539
private volatile IMqttAsyncClient client;
3640

37-
public Mqttv5ClientManager(String url, String clientId) {
38-
super(clientId);
39-
Assert.notNull(url, "'url' is required");
40-
setUrl(url);
41-
this.connectionOptions = new MqttConnectionOptions();
42-
this.connectionOptions.setServerURIs(new String[]{ url });
43-
this.connectionOptions.setAutomaticReconnect(true);
44-
}
45-
4641
public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String clientId) {
4742
super(clientId);
4843
Assert.notNull(connectionOptions, "'connectionOptions' is required");
4944
this.connectionOptions = connectionOptions;
5045
if (!this.connectionOptions.isAutomaticReconnect()) {
51-
logger.warn("It is recommended to set 'automaticReconnect' MQTT connection option. " +
46+
logger.info("If this `ClientManager` is used from message-driven channel adapters, " +
47+
"it is recommended to set 'automaticReconnect' MQTT connection option. " +
5248
"Otherwise connection check and reconnect should be done manually.");
5349
}
5450
Assert.notEmpty(connectionOptions.getServerURIs(), "'serverURIs' must be provided in the 'MqttConnectionOptions'");
5551
setUrl(connectionOptions.getServerURIs()[0]);
5652
}
5753

54+
public Mqttv5ClientManager(String url, String clientId) {
55+
this(buildDefaultConnectionOptions(url), clientId);
56+
}
57+
58+
private static MqttConnectionOptions buildDefaultConnectionOptions(String url) {
59+
Assert.notNull(url, "'url' is required");
60+
var connectionOptions = new MqttConnectionOptions();
61+
connectionOptions.setServerURIs(new String[]{ url });
62+
connectionOptions.setAutomaticReconnect(true);
63+
return connectionOptions;
64+
}
65+
5866
@Override
5967
public IMqttAsyncClient getClient() {
6068
return this.client;
@@ -78,6 +86,15 @@ public synchronized void start() {
7886
}
7987
catch (MqttException e) {
8088
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);
89+
90+
if (this.connectionOptions.isAutomaticReconnect()) {
91+
try {
92+
this.client.reconnect();
93+
}
94+
catch (MqttException ex) {
95+
logger.error("MQTT client failed to re-connect.", ex);
96+
}
97+
}
8198
}
8299
}
83100

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* @author Artem Bilan
4646
* @author Trung Pham
4747
* @author Mikhail Polivakha
48+
* @author Artem Vozhdayenko
4849
*
4950
* @since 4.0
5051
*
@@ -59,9 +60,9 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends Message
5960
*/
6061
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
6162

62-
private final String url;
63+
private String url;
6364

64-
private final String clientId;
65+
private String clientId;
6566

6667
private final Set<Topic> topics;
6768

@@ -79,26 +80,33 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends Message
7980

8081
public AbstractMqttMessageDrivenChannelAdapter(@Nullable String url, String clientId, String... topic) {
8182
Assert.hasText(clientId, "'clientId' cannot be null or empty");
82-
Assert.notNull(topic, "'topics' cannot be null");
83-
Assert.noNullElements(topic, "'topics' cannot have null elements");
8483
this.url = url;
8584
this.clientId = clientId;
86-
this.topics = new LinkedHashSet<>();
85+
this.topics = initTopics(topic);
86+
}
87+
88+
AbstractMqttMessageDrivenChannelAdapter(ClientManager<T> clientManager, String... topic) {
89+
Assert.notNull(clientManager, "'clientManager' cannot be null");
90+
this.clientManager = clientManager;
91+
this.topics = initTopics(topic);
92+
}
93+
94+
private Set<Topic> initTopics(String[] topic) {
95+
Assert.notNull(topic, "'topics' cannot be null");
96+
Assert.noNullElements(topic, "'topics' cannot have null elements");
97+
final Set<Topic> initialTopics = new LinkedHashSet<>();
98+
int defaultQos = 1;
8799
for (String t : topic) {
88-
this.topics.add(new Topic(t, 1));
100+
initialTopics.add(new Topic(t, defaultQos));
89101
}
102+
return initialTopics;
90103
}
91104

92105
public void setConverter(MqttMessageConverter converter) {
93106
Assert.notNull(converter, "'converter' cannot be null");
94107
this.converter = converter;
95108
}
96109

97-
public void setClientManager(ClientManager<T> clientManager) {
98-
Assert.notNull(clientManager, "'clientManager' cannot be null");
99-
this.clientManager = clientManager;
100-
}
101-
102110
public ClientManager<T> getClientManager() {
103111
return this.clientManager;
104112
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.context.ApplicationEventPublisher;
3434
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3535
import org.springframework.integration.acks.SimpleAcknowledgment;
36+
import org.springframework.integration.mqtt.core.ClientManager;
3637
import org.springframework.integration.mqtt.core.ConsumerStopAction;
3738
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
3839
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@@ -57,6 +58,7 @@
5758
*
5859
* @author Gary Russell
5960
* @author Artem Bilan
61+
* @author Artem Vozhdayenko
6062
*
6163
* @since 4.0
6264
*
@@ -130,6 +132,32 @@ public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String..
130132
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
131133
}
132134

135+
/**
136+
* Use this constructor when you need to use a single {@link ClientManager}
137+
* (for instance, to reuse an MQTT connection) and a specific {@link MqttConnectOptions}.
138+
* @param clientManager The client manager.
139+
* @param connectOptions The connection options.
140+
* @param topic The topic(s).
141+
*/
142+
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager,
143+
MqttConnectOptions connectOptions, String... topic) {
144+
145+
super(clientManager, topic);
146+
var factory = new DefaultMqttPahoClientFactory();
147+
factory.setConnectionOptions(connectOptions);
148+
this.clientFactory = factory;
149+
}
150+
151+
/**
152+
* Use this constructor when you need to use a single {@link ClientManager}
153+
* (for instance, to reuse an MQTT connection).
154+
* @param clientManager The client manager.
155+
* @param topic The topic(s).
156+
*/
157+
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) {
158+
this(clientManager, new MqttConnectOptions(), topic);
159+
}
160+
133161
/**
134162
* Set the completion timeout when disconnecting. Not settable using the namespace.
135163
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
@@ -279,12 +307,12 @@ private synchronized void connectAndSubscribe() throws MqttException { // NOSONA
279307
if (this.consumerStopAction == null) {
280308
this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
281309
}
282-
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
283-
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
284310

285311
IMqttAsyncClient clientInstance;
286312

287313
if (getClientManager() == null) {
314+
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
315+
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
288316
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
289317
this.client.setCallback(this);
290318
clientInstance = this.client;

0 commit comments

Comments
 (0)