Skip to content

Commit 3251fd2

Browse files
committed
spring-projectsGH-3685: Share MQTT connection across components
Fixes spring-projects#3685 Introduce some initial design. Add a new interface `ClientManager` which will manage clients and connections. Use this manager in v3 topic adapter and message handler.
1 parent 5a178de commit 3251fd2

File tree

5 files changed

+264
-30
lines changed

5 files changed

+264
-30
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import org.springframework.integration.support.management.ManageableLifecycle;
20+
21+
public interface ClientManager<T> extends ManageableLifecycle {
22+
23+
T getClient();
24+
25+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import java.time.Instant;
20+
import java.util.concurrent.ScheduledFuture;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
24+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
25+
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
26+
import org.eclipse.paho.client.mqttv3.MqttCallback;
27+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
28+
import org.eclipse.paho.client.mqttv3.MqttException;
29+
import org.eclipse.paho.client.mqttv3.MqttMessage;
30+
31+
import org.springframework.integration.context.IntegrationObjectSupport;
32+
import org.springframework.integration.support.management.ManageableLifecycle;
33+
34+
public class Mqttv3ClientManager extends IntegrationObjectSupport implements ClientManager<IMqttAsyncClient>,
35+
ManageableLifecycle, MqttCallback {
36+
37+
private AtomicReference<ScheduledFuture<?>> scheduledReconnect;
38+
39+
private final MqttConnectOptions connectOptions;
40+
41+
private final String clientId;
42+
43+
private IMqttAsyncClient client;
44+
45+
public Mqttv3ClientManager(MqttConnectOptions connectOptions, String clientId) throws MqttException {
46+
this.connectOptions = connectOptions;
47+
this.client = new MqttAsyncClient(connectOptions.getServerURIs()[0], clientId);
48+
this.client.setCallback(this);
49+
this.clientId = clientId;
50+
}
51+
52+
@Override
53+
public IMqttAsyncClient getClient() {
54+
return client;
55+
}
56+
57+
@Override
58+
public void start() {
59+
if (this.client == null) {
60+
try {
61+
this.client = new MqttAsyncClient(this.connectOptions.getServerURIs()[0], this.clientId);
62+
}
63+
catch (MqttException e) {
64+
throw new IllegalStateException("could not start client manager", e);
65+
}
66+
this.client.setCallback(this);
67+
}
68+
try {
69+
connect();
70+
}
71+
catch (MqttException e) {
72+
logger.error(e, "could not start client manager, scheduling reconnect, client_id=" +
73+
this.client.getClientId());
74+
scheduleReconnect();
75+
}
76+
}
77+
78+
@Override
79+
public void stop() {
80+
if (this.client == null) {
81+
return;
82+
}
83+
try {
84+
this.client.disconnectForcibly(this.connectOptions.getConnectionTimeout());
85+
}
86+
catch (MqttException e) {
87+
logger.error(e, "could not disconnect from the client");
88+
}
89+
finally {
90+
try {
91+
this.client.close();
92+
}
93+
catch (MqttException e) {
94+
logger.error(e, "could not close the client");
95+
}
96+
this.client = null;
97+
}
98+
}
99+
100+
@Override
101+
public boolean isRunning() {
102+
return this.client != null;
103+
}
104+
105+
private synchronized void connect() throws MqttException {
106+
if (this.client == null) {
107+
logger.error("could not connect on a null client reference");
108+
return;
109+
}
110+
MqttConnectOptions options = Mqttv3ClientManager.this.connectOptions;
111+
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
112+
}
113+
114+
@Override
115+
public synchronized void connectionLost(Throwable cause) {
116+
logger.error(cause, "connection lost, scheduling reconnect, client_id=" + this.client.getClientId());
117+
scheduleReconnect();
118+
}
119+
120+
private void scheduleReconnect() {
121+
if (this.scheduledReconnect.get() != null) {
122+
this.scheduledReconnect.get().cancel(false);
123+
}
124+
this.scheduledReconnect.set(getTaskScheduler().schedule(() -> {
125+
try {
126+
connect();
127+
this.scheduledReconnect.set(null);
128+
}
129+
catch (MqttException e) {
130+
logger.error(e, "could not reconnect");
131+
scheduleReconnect();
132+
}
133+
}, Instant.now().plusSeconds(10)));
134+
}
135+
136+
@Override
137+
public void messageArrived(String topic, MqttMessage message) {
138+
// not this manager concern
139+
}
140+
141+
@Override
142+
public void deliveryComplete(IMqttDeliveryToken token) {
143+
// nor this manager concern
144+
}
145+
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.context.ApplicationEventPublisherAware;
2727
import org.springframework.core.log.LogMessage;
2828
import org.springframework.integration.endpoint.MessageProducerSupport;
29+
import org.springframework.integration.mqtt.core.ClientManager;
2930
import org.springframework.integration.mqtt.support.MqttMessageConverter;
3031
import org.springframework.integration.support.management.IntegrationManagedResource;
3132
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -38,6 +39,8 @@
3839
/**
3940
* Abstract class for MQTT Message-Driven Channel Adapters.
4041
*
42+
* @param <T> MQTT Client type
43+
*
4144
* @author Gary Russell
4245
* @author Artem Bilan
4346
* @author Trung Pham
@@ -48,7 +51,7 @@
4851
*/
4952
@ManagedResource
5053
@IntegrationManagedResource
51-
public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport
54+
public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends MessageProducerSupport
5255
implements ApplicationEventPublisherAware {
5356

5457
/**
@@ -70,6 +73,8 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessagePro
7073

7174
private MqttMessageConverter converter;
7275

76+
protected ClientManager<T> clientManager;
77+
7378
protected final Lock topicLock = new ReentrantLock(); // NOSONAR
7479

7580
public AbstractMqttMessageDrivenChannelAdapter(@Nullable String url, String clientId, String... topic) {
@@ -89,6 +94,15 @@ public void setConverter(MqttMessageConverter converter) {
8994
this.converter = converter;
9095
}
9196

97+
public void setClientManager(ClientManager<T> clientManager) {
98+
Assert.notNull(clientManager, "'clientManager' cannot be null");
99+
this.clientManager = clientManager;
100+
}
101+
102+
public ClientManager<T> getClientManager() {
103+
return this.clientManager;
104+
}
105+
92106
/**
93107
* Set the QoS for each topic; a single value will apply to all topics otherwise
94108
* the correct number of qos values must be provided.

0 commit comments

Comments
 (0)