Skip to content

Commit f3fba7d

Browse files
committed
spring-projectsGH-3685: Share MQTT connection across components
Fixes spring-projects#3685 Other fixes and improvements after code review: * Changes around fields, methods, ctors visibility * Removed contradictory ctors * Reduce amount of unnecessary `getClientManager() != null` checks in logic and make it as similar as possible for client manager and the old approach * Use auto-reconnect where possible * Remove manual reconnect trigger and rely on events instead to know where to subscribe * Do not close the connection in adapter to be able to use reconnect logic without lose of subscriptions * Make `ClientManager` extend `MqttComponent` so that it knows about connection options as part of its contract * Remove not relevant auto test cases (relying on connection close or manual reconnect) * Other code style smaller changes
1 parent 4cc094f commit f3fba7d

17 files changed

+450
-548
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttMessageDrivenChannelAdapterParser.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,7 +48,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
4848
builder.addPropertyReference("outputChannel", channelName);
4949
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
5050
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
51-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
5251
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");
5352

5453
return builder.getBeanDefinition();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java

+56-4
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,49 @@
1616

1717
package org.springframework.integration.mqtt.core;
1818

19+
import java.util.Collections;
20+
import java.util.HashSet;
21+
import java.util.Set;
22+
1923
import org.apache.commons.logging.Log;
2024
import org.apache.commons.logging.LogFactory;
2125

2226
import org.springframework.context.ApplicationEventPublisher;
2327
import org.springframework.context.ApplicationEventPublisherAware;
28+
import org.springframework.context.SmartLifecycle;
29+
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
2430
import org.springframework.util.Assert;
2531

2632
/**
2733
* @param <T> MQTT client type
34+
* @param <C> MQTT connection options type (v5 or v3)
2835
*
2936
* @author Artem Vozhdayenko
3037
*
3138
* @since 6.0
3239
*/
33-
public abstract class AbstractMqttClientManager<T> implements ClientManager<T>, ApplicationEventPublisherAware {
40+
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware {
3441

3542
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
3643

37-
private ApplicationEventPublisher applicationEventPublisher;
44+
private final Set<ConnectCallback> connectCallbacks;
45+
46+
private final String clientId;
3847

3948
private boolean manualAcks;
4049

50+
private ApplicationEventPublisher applicationEventPublisher;
51+
4152
private String url;
4253

43-
private final String clientId;
54+
private volatile T client;
4455

45-
volatile T client;
56+
private String beanName;
4657

4758
AbstractMqttClientManager(String clientId) {
4859
Assert.notNull(clientId, "'clientId' is required");
4960
this.clientId = clientId;
61+
this.connectCallbacks = Collections.synchronizedSet(new HashSet<>());
5062
}
5163

5264
protected void setManualAcks(boolean manualAcks) {
@@ -69,6 +81,14 @@ protected ApplicationEventPublisher getApplicationEventPublisher() {
6981
return this.applicationEventPublisher;
7082
}
7183

84+
protected synchronized void setClient(T client) {
85+
this.client = client;
86+
}
87+
88+
protected Set<ConnectCallback> getCallbacks() {
89+
return Collections.unmodifiableSet(this.connectCallbacks);
90+
}
91+
7292
@Override
7393
public boolean isManualAcks() {
7494
return this.manualAcks;
@@ -85,7 +105,39 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
85105
this.applicationEventPublisher = applicationEventPublisher;
86106
}
87107

108+
@Override
109+
public void setBeanName(String name) {
110+
this.beanName = name;
111+
}
112+
113+
@Override
114+
public String getBeanName() {
115+
return this.beanName;
116+
}
117+
118+
/**
119+
* The phase of component autostart in {@link SmartLifecycle}.
120+
* If the custom one is required, note that for the correct behavior it should be less than phase of
121+
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
122+
* @return {@link SmartLifecycle} autostart phase
123+
*/
124+
@Override
125+
public int getPhase() {
126+
return 0;
127+
}
128+
129+
@Override
130+
public void addCallback(ConnectCallback connectCallback) {
131+
this.connectCallbacks.add(connectCallback);
132+
}
133+
134+
@Override
135+
public boolean removeCallback(ConnectCallback connectCallback) {
136+
return this.connectCallbacks.remove(connectCallback);
137+
}
138+
88139
public synchronized boolean isRunning() {
89140
return this.client != null;
90141
}
142+
91143
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,29 @@
2424
* Using this manager in multiple MQTT integrations will preserve a single connection.
2525
*
2626
* @param <T> MQTT client type
27+
* @param <C> MQTT connection options type (v5 or v3)
2728
*
2829
* @author Artem Vozhdayenko
2930
*
3031
* @since 6.0
3132
*/
32-
public interface ClientManager<T> extends SmartLifecycle {
33+
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
3334

3435
T getClient();
3536

3637
boolean isManualAcks();
3738

39+
void addCallback(ConnectCallback connectCallback);
40+
41+
boolean removeCallback(ConnectCallback connectCallback);
42+
43+
/**
44+
* A contract for a custom callback if needed by a usage.
45+
*/
46+
interface ConnectCallback {
47+
48+
void connectComplete(boolean isReconnect);
49+
50+
}
51+
3852
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

+29-24
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2020
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
21-
import org.eclipse.paho.client.mqttv3.MqttCallback;
21+
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
2222
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2323
import org.eclipse.paho.client.mqttv3.MqttException;
2424
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -30,7 +30,8 @@
3030
* @author Artem Vozhdayenko
3131
* @since 6.0
3232
*/
33-
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
33+
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient, MqttConnectOptions>
34+
implements MqttCallbackExtended {
3435

3536
private final MqttPahoClientFactory clientFactory;
3637

@@ -65,62 +66,62 @@ private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
6566

6667
@Override
6768
public synchronized void start() {
68-
if (this.client == null) {
69+
if (getClient() == null) {
6970
try {
70-
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
71-
this.client.setManualAcks(isManualAcks());
72-
this.client.setCallback(this);
71+
var client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
72+
client.setManualAcks(isManualAcks());
73+
client.setCallback(this);
74+
setClient(client);
7375
}
7476
catch (MqttException e) {
7577
throw new IllegalStateException("could not start client manager", e);
7678
}
7779
}
7880
try {
7981
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
80-
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
82+
getClient().connect(options).waitForCompletion(options.getConnectionTimeout());
8183
}
8284
catch (MqttException e) {
83-
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);
84-
85-
if (this.clientFactory.getConnectionOptions().isAutomaticReconnect()) {
86-
try {
87-
this.client.reconnect();
88-
}
89-
catch (MqttException ex) {
90-
logger.error("MQTT client failed to re-connect.", ex);
91-
}
92-
}
93-
else if (getApplicationEventPublisher() != null) {
94-
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e));
85+
logger.error("could not start client manager, client_id=" + getClientId(), e);
86+
87+
var applicationEventPublisher = getApplicationEventPublisher();
88+
if (applicationEventPublisher != null) {
89+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
9590
}
9691
}
9792
}
9893

9994
@Override
10095
public synchronized void stop() {
101-
if (this.client == null) {
96+
var client = getClient();
97+
if (client == null) {
10298
return;
10399
}
104100
try {
105-
this.client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
101+
client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
106102
}
107103
catch (MqttException e) {
108104
logger.error("could not disconnect from the client", e);
109105
}
110106
finally {
111107
try {
112-
this.client.close();
108+
client.close();
113109
}
114110
catch (MqttException e) {
115111
logger.error("could not close the client", e);
116112
}
117-
this.client = null;
113+
setClient(null);
118114
}
119115
}
120116

121117
@Override
122118
public synchronized void connectionLost(Throwable cause) {
123-
logger.error("connection lost, client_id=" + this.client.getClientId(), cause);
119+
logger.error("connection lost, client_id=" + getClientId(), cause);
120+
}
121+
122+
@Override
123+
public void connectComplete(boolean reconnect, String serverURI) {
124+
getCallbacks().forEach(callback -> callback.connectComplete(reconnect));
124125
}
125126

126127
@Override
@@ -133,4 +134,8 @@ public void deliveryComplete(IMqttDeliveryToken token) {
133134
// nor this manager concern
134135
}
135136

137+
@Override
138+
public MqttConnectOptions getConnectionInfo() {
139+
return this.clientFactory.getConnectionOptions();
140+
}
136141
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

+23-26
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
* @author Artem Vozhdayenko
3434
* @since 6.0
3535
*/
36-
public class Mqttv5ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
36+
public class Mqttv5ClientManager extends AbstractMqttClientManager<IMqttAsyncClient, MqttConnectionOptions>
37+
implements MqttCallback {
3738

3839
private final MqttConnectionOptions connectionOptions;
3940

@@ -64,57 +65,52 @@ private static MqttConnectionOptions buildDefaultConnectionOptions(String url) {
6465

6566
@Override
6667
public synchronized void start() {
67-
if (this.client == null) {
68+
if (getClient() == null) {
6869
try {
69-
this.client = new MqttAsyncClient(getUrl(), getClientId());
70-
this.client.setManualAcks(isManualAcks());
71-
this.client.setCallback(this);
70+
var client = new MqttAsyncClient(getUrl(), getClientId());
71+
client.setManualAcks(isManualAcks());
72+
client.setCallback(this);
73+
setClient(client);
7274
}
7375
catch (MqttException e) {
7476
throw new IllegalStateException("could not start client manager", e);
7577
}
7678
}
7779
try {
78-
this.client.connect(this.connectionOptions)
80+
getClient().connect(this.connectionOptions)
7981
.waitForCompletion(this.connectionOptions.getConnectionTimeout());
8082
}
8183
catch (MqttException e) {
82-
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);
83-
84-
if (this.connectionOptions.isAutomaticReconnect()) {
85-
try {
86-
this.client.reconnect();
87-
}
88-
catch (MqttException ex) {
89-
logger.error("MQTT client failed to re-connect.", ex);
90-
}
91-
}
92-
else if (getApplicationEventPublisher() != null) {
93-
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e));
84+
logger.error("could not start client manager, client_id=" + getClientId(), e);
85+
86+
var applicationEventPublisher = getApplicationEventPublisher();
87+
if (applicationEventPublisher != null) {
88+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
9489
}
9590
}
9691
}
9792

9893
@Override
9994
public synchronized void stop() {
100-
if (this.client == null) {
95+
var client = getClient();
96+
if (client == null) {
10197
return;
10298
}
10399

104100
try {
105-
this.client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
101+
client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
106102
}
107103
catch (MqttException e) {
108104
logger.error("could not disconnect from the client", e);
109105
}
110106
finally {
111107
try {
112-
this.client.close();
108+
client.close();
113109
}
114110
catch (MqttException e) {
115111
logger.error("could not close the client", e);
116112
}
117-
this.client = null;
113+
setClient(null);
118114
}
119115
}
120116

@@ -130,10 +126,7 @@ public void deliveryComplete(IMqttToken token) {
130126

131127
@Override
132128
public void connectComplete(boolean reconnect, String serverURI) {
133-
if (logger.isInfoEnabled()) {
134-
logger.info("MQTT connect complete to " + serverURI);
135-
}
136-
// probably makes sense to use custom callbacks in the future
129+
getCallbacks().forEach(callback -> callback.connectComplete(reconnect));
137130
}
138131

139132
@Override
@@ -153,4 +146,8 @@ public void mqttErrorOccurred(MqttException exception) {
153146
logger.error("MQTT error occurred", exception);
154147
}
155148

149+
@Override
150+
public MqttConnectionOptions getConnectionInfo() {
151+
return this.connectionOptions;
152+
}
156153
}

0 commit comments

Comments
 (0)