Skip to content

Commit e7c0d8d

Browse files
GH-3432: Add MQTT v5 channel adapters (#3639)
* GH-3432: Add MQTT v5 channel adapters Fixes #3432 * Add `optional` dependency for `org.eclipse.paho:org.eclipse.paho.mqttv5.client` * Add `MqttProtocolErrorEvent` and emit it from the `mqttErrorOccurred()` callback of the MQTT v5 client * Add `MqttHeaderMapper` since MQTT v5 has introduced user properties pair to transfer over the protocol * Add `Mqttv5PahoMessageHandler` as one more extension of the `AbstractMqttMessageHandler` * Add more convenient `MqttHeaders` constants for easier headers mapping configuration * Ensure via `Mqttv5BackToBackTests` that MQTT v5 is supported by the provided components * Change `pr-build-workflow.yml` to use `eclipse-mosquitto` container for testing all the MQTT interactions * Change `cyrilix/rabbitmq-mqtt` service to the `rabbitmq:management` since RabbitMQ does not support MQTT v5 * * Handle manual acks * Add `Mqttv5PahoMessageDrivenChannelAdapter.persistence` property * * Add documentation * Add `MosquittoContainerTest` for TestContainers support with Mosquitto image * Fix language in the docs after review Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent b8e414d commit e7c0d8d

15 files changed

+1401
-138
lines changed

Diff for: build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ configure(javaProjects) { subproject ->
251251
testImplementation 'org.jetbrains.kotlin:kotlin-reflect'
252252
testImplementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
253253
testImplementation 'io.projectreactor:reactor-test'
254+
testImplementation 'org.testcontainers:junit-jupiter:1.16.0'
254255

255256
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
256257
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
@@ -680,6 +681,7 @@ project('spring-integration-mqtt') {
680681
dependencies {
681682
api project(':spring-integration-core')
682683
api "org.eclipse.paho:org.eclipse.paho.client.mqttv3:$pahoMqttClientVersion"
684+
optionalApi "org.eclipse.paho:org.eclipse.paho.mqttv5.client:$pahoMqttClientVersion"
683685

684686
testImplementation project(':spring-integration-jmx')
685687
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.event;
18+
19+
import org.eclipse.paho.mqttv5.common.MqttException;
20+
21+
/**
22+
* The even representing an MQTT error occured during client interaction.
23+
*
24+
* @author Artem Bilan
25+
*
26+
* @since 5.5.5
27+
*
28+
* @see org.eclipse.paho.mqttv5.client.MqttCallback#mqttErrorOccurred(MqttException)
29+
*/
30+
@SuppressWarnings("serial")
31+
public class MqttProtocolErrorEvent extends MqttIntegrationEvent {
32+
33+
public MqttProtocolErrorEvent(Object source, MqttException exception) {
34+
super(source, exception);
35+
}
36+
37+
}

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

+52-15
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import java.util.concurrent.locks.Lock;
2222
import java.util.concurrent.locks.ReentrantLock;
2323

24+
import org.springframework.context.ApplicationEventPublisher;
25+
import org.springframework.context.ApplicationEventPublisherAware;
2426
import org.springframework.core.log.LogMessage;
2527
import org.springframework.integration.endpoint.MessageProducerSupport;
26-
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
2728
import org.springframework.integration.mqtt.support.MqttMessageConverter;
2829
import org.springframework.integration.support.management.IntegrationManagedResource;
2930
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -45,15 +46,27 @@
4546
*/
4647
@ManagedResource
4748
@IntegrationManagedResource
48-
public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport {
49+
public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport
50+
implements ApplicationEventPublisherAware {
51+
52+
/**
53+
* The default completion timeout in milliseconds.
54+
*/
55+
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
4956

5057
private final String url;
5158

5259
private final String clientId;
5360

5461
private final Set<Topic> topics;
5562

56-
private volatile MqttMessageConverter converter;
63+
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
64+
65+
private boolean manualAcks;
66+
67+
private ApplicationEventPublisher applicationEventPublisher;
68+
69+
private MqttMessageConverter converter;
5770

5871
protected final Lock topicLock = new ReentrantLock(); // NOSONAR
5972

@@ -147,6 +160,42 @@ public String getComponentType() {
147160
return "mqtt:inbound-channel-adapter";
148161
}
149162

163+
@Override
164+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
165+
this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization)
166+
}
167+
168+
protected ApplicationEventPublisher getApplicationEventPublisher() {
169+
return this.applicationEventPublisher;
170+
}
171+
172+
/**
173+
* Set the acknowledgment mode to manual.
174+
* @param manualAcks true for manual acks.
175+
* @since 5.3
176+
*/
177+
public void setManualAcks(boolean manualAcks) {
178+
this.manualAcks = manualAcks;
179+
}
180+
181+
protected boolean isManualAcks() {
182+
return this.manualAcks;
183+
}
184+
185+
/**
186+
* Set the completion timeout for operations. Not settable using the namespace.
187+
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
188+
* @param completionTimeout The timeout.
189+
* @since 4.1
190+
*/
191+
public void setCompletionTimeout(long completionTimeout) {
192+
this.completionTimeout = completionTimeout;
193+
}
194+
195+
protected long getCompletionTimeout() {
196+
return this.completionTimeout;
197+
}
198+
150199
/**
151200
* Add a topic to the subscribed list.
152201
* @param topic The topic.
@@ -239,18 +288,6 @@ public void removeTopic(String... topic) {
239288
}
240289
}
241290

242-
@Override
243-
protected void onInit() {
244-
super.onInit();
245-
if (this.converter == null) {
246-
DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter();
247-
pahoMessageConverter.setBeanFactory(getBeanFactory());
248-
this.converter = pahoMessageConverter;
249-
250-
}
251-
}
252-
253-
254291
/**
255292
* @since 4.1
256293
*/

Diff for: spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+32-54
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.eclipse.paho.client.mqttv3.MqttMessage;
3030

3131
import org.springframework.context.ApplicationEventPublisher;
32-
import org.springframework.context.ApplicationEventPublisherAware;
3332
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3433
import org.springframework.integration.acks.SimpleAcknowledgment;
3534
import org.springframework.integration.mqtt.core.ConsumerStopAction;
@@ -38,6 +37,7 @@
3837
import org.springframework.integration.mqtt.core.MqttPahoComponent;
3938
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
4039
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
40+
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
4141
import org.springframework.integration.mqtt.support.MqttUtils;
4242
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4343
import org.springframework.messaging.Message;
@@ -60,12 +60,7 @@
6060
*
6161
*/
6262
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
63-
implements MqttCallback, MqttPahoComponent, ApplicationEventPublisherAware {
64-
65-
/**
66-
* The default completion timeout in milliseconds.
67-
*/
68-
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
63+
implements MqttCallback, MqttPahoComponent {
6964

7065
/**
7166
* The default disconnect completion timeout in milliseconds.
@@ -78,14 +73,8 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv
7873

7974
private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
8075

81-
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
82-
8376
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
8477

85-
private boolean manualAcks;
86-
87-
private ApplicationEventPublisher applicationEventPublisher;
88-
8978
private volatile IMqttClient client;
9079

9180
private volatile ScheduledFuture<?> reconnectFuture;
@@ -139,16 +128,6 @@ public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String..
139128
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
140129
}
141130

142-
/**
143-
* Set the completion timeout for operations. Not settable using the namespace.
144-
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
145-
* @param completionTimeout The timeout.
146-
* @since 4.1
147-
*/
148-
public synchronized void setCompletionTimeout(long completionTimeout) {
149-
this.completionTimeout = completionTimeout;
150-
}
151-
152131
/**
153132
* Set the completion timeout when disconnecting. Not settable using the namespace.
154133
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
@@ -169,23 +148,6 @@ public synchronized void setRecoveryInterval(int recoveryInterval) {
169148
this.recoveryInterval = recoveryInterval;
170149
}
171150

172-
/**
173-
* Set the acknowledgment mode to manual.
174-
* @param manualAcks true for manual acks.
175-
* @since 5.3
176-
*/
177-
public void setManualAcks(boolean manualAcks) {
178-
this.manualAcks = manualAcks;
179-
}
180-
181-
/**
182-
* @since 4.2.2
183-
*/
184-
@Override
185-
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
186-
this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization)
187-
}
188-
189151
@Override
190152
public MqttConnectOptions getConnectionInfo() {
191153
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
@@ -199,6 +161,17 @@ public MqttConnectOptions getConnectionInfo() {
199161
return options;
200162
}
201163

164+
@Override
165+
protected void onInit() {
166+
super.onInit();
167+
if (getConverter() == null) {
168+
DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter();
169+
pahoMessageConverter.setBeanFactory(getBeanFactory());
170+
setConverter(pahoMessageConverter);
171+
172+
}
173+
}
174+
202175
@Override
203176
protected void doStart() {
204177
Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required");
@@ -293,22 +266,26 @@ private synchronized void connectAndSubscribe() throws MqttException {
293266
this.client = this.clientFactory.getClientInstance(getUrl(), getClientId());
294267
this.client.setCallback(this);
295268
if (this.client instanceof MqttClient) {
296-
((MqttClient) this.client).setTimeToWait(this.completionTimeout);
269+
((MqttClient) this.client).setTimeToWait(getCompletionTimeout());
297270
}
298271

299272
this.topicLock.lock();
300273
String[] topics = getTopic();
274+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
301275
try {
302276
this.client.connect(connectionOptions);
303-
this.client.setManualAcks(this.manualAcks);
304-
int[] requestedQos = getQos();
305-
int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
306-
this.client.subscribe(topics, grantedQos);
307-
warnInvalidQosForSubscription(topics, requestedQos, grantedQos);
277+
this.client.setManualAcks(isManualAcks());
278+
if (topics.length > 0) {
279+
int[] requestedQos = getQos();
280+
int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
281+
this.client.subscribe(topics, grantedQos);
282+
warnInvalidQosForSubscription(topics, requestedQos, grantedQos);
283+
}
308284
}
309285
catch (MqttException ex) {
310-
if (this.applicationEventPublisher != null) {
311-
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
286+
287+
if (applicationEventPublisher != null) {
288+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
312289
}
313290
logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics));
314291
if (this.client != null) { // Could be reset during event handling before
@@ -331,8 +308,8 @@ private synchronized void connectAndSubscribe() throws MqttException {
331308
this.connected = true;
332309
String message = "Connected and subscribed to " + Arrays.toString(topics);
333310
logger.debug(message);
334-
if (this.applicationEventPublisher != null) {
335-
this.applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
311+
if (applicationEventPublisher != null) {
312+
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
336313
}
337314
}
338315
}
@@ -397,8 +374,9 @@ public synchronized void connectionLost(Throwable cause) {
397374
}
398375
this.client = null;
399376
scheduleReconnect();
400-
if (this.applicationEventPublisher != null) {
401-
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
377+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
378+
if (applicationEventPublisher != null) {
379+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
402380
}
403381
}
404382
}
@@ -407,7 +385,7 @@ public synchronized void connectionLost(Throwable cause) {
407385
public void messageArrived(String topic, MqttMessage mqttMessage) {
408386
AbstractIntegrationMessageBuilder<?> builder = toMessageBuilder(topic, mqttMessage);
409387
if (builder != null) {
410-
if (this.manualAcks) {
388+
if (isManualAcks()) {
411389
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
412390
new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
413391
}
@@ -458,7 +436,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
458436
}
459437

460438
/**
461-
* Used to complete message arrival when {@link #manualAcks} is true.
439+
* Used to complete message arrival when {@link #isManualAcks()} is true.
462440
*
463441
* @since 5.3
464442
*/

0 commit comments

Comments
 (0)