Skip to content

Share MQTT connection between inbound & outbound IntegrationFlows #3685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
durimkryeziu opened this issue Nov 29, 2021 · 7 comments
Closed

Comments

@durimkryeziu
Copy link

durimkryeziu commented Nov 29, 2021

Expected Behavior
If there are two IntegrationFlows that use the same ClientFactory, to be able to share the same MQTT connection, hence being able to publish and subscribe with the same client id

Current Behavior
When I create two IntegartionFlows to publish & subscribe to an MQTT Broker, first the subscribe (inbound) flow connects and subscribes successfully to the given topic. Then when a message is published to a topic, the previous connection drops so the publishing (outbound) flow connects!

Relevant logs:

DEBUG 8950 --- [           main] .m.i.MqttPahoMessageDrivenChannelAdapter : Connected and subscribed to [notified]
 INFO 8950 --- [           main] c.e.m.MqttSharedConnectionApplication    : Publishing a message to MQTT Broker...
DEBUG 8950 --- [           main] o.s.i.m.outbound.MqttPahoMessageHandler  : Client connected
ERROR 8950 --- [c: sameClientId] .m.i.MqttPahoMessageDrivenChannelAdapter : Lost connection: Connection lost; retrying...
DEBUG 8950 --- [   scheduling-1] .m.i.MqttPahoMessageDrivenChannelAdapter : Attempting reconnect
ERROR 8950 --- [c: sameClientId] o.s.i.m.outbound.MqttPahoMessageHandler  : Lost connection; will attempt reconnect on next request
DEBUG 8950 --- [   scheduling-1] .m.i.MqttPahoMessageDrivenChannelAdapter : Connected and subscribed to [notified]

So basically they ping-pong with each other as MQTT allows only one connection per client id

Context
My use-case is that I have to use the same client id both for subscribing and publishing messages to the MQTT broker

Sample project: https://github.com/durimkryeziu/mqtt-shared-connection

@durimkryeziu durimkryeziu added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Nov 29, 2021
@artembilan
Copy link
Member

This request is not OK in the context of those channel adapters.
They do this:

this.client = this.clientFactory.getClientInstance(getUrl(), getClientId());
this.client.setCallback(this);

And that callback is really unique for each channel adapter.
Even if we could have a shared IMqttClient instance between channel adapters, we still cannot have a single MqttCallback according an internal logic of MqttPahoMessageDrivenChannelAdapter and MqttPahoMessageHandler.
And now imaging if you would like to share the same client between many similar channel adapters: the callback behavior would become more messy...

So, it is probably OK to have the same client for publishing an subscribing logic in the same place: https://stackoverflow.com/questions/49484629/mqtt-publish-subscribe-in-the-same-client.
But only when we do everything manually.
If we try to rely on some components from some library, it is really hard to be consistent with the shared objects.

How about having a client id as prefixed: sameClientId.subscriber and sameClientId.publisher?
So, you still will be able to distinguish connected client on your broker.

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Nov 29, 2021
@durimkryeziu
Copy link
Author

Hi @artembilan, thanks for your fast response :)

Thanks, I checked that StackOverflow answer but I'd like to achieve it using the APIs of Spring Integration, as we have other integrations implemented with it on that project!

Having client id prefixed... well then it means 2 MQTT connections for each client, which is a waste of resources, isn't it?

@artembilan
Copy link
Member

As I said: it was designed like that to keep the proper track of MqttCallback.
We may revise this in the upcoming 6.0 generation: I see there is an API like this:

 void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException;

So, for every new subscriber we can have its own listening context while keeping the rest of MqttCallback in the central place, e.g. MqttClientManager.
Probably something similar what we have so far with the StompSessionManager or ClientWebSocketContainer.
As far as I'm concerned I see that on publishing side the MqttClient doesn't have a state, but just gives us a DeliveryToken to track publication.

So, yeah: let's see what we can do so far in the next version!

For now you only have a choice or use different clients or implement a logic manually.

@artembilan artembilan added in: mqtt and removed status: waiting-for-reporter Needs a feedback from the reporter labels Nov 29, 2021
@artembilan artembilan added this to the 6.0.x milestone Nov 29, 2021
@Dacesilian
Copy link

Hello, I would like to ask if there is any solution to this, is it? MQTT allows only one connection per clientId, so my client gets disconnected. Thank you.

@artembilan
Copy link
Member

@Dacesilian ,

we definitely are looking into this, but since there are expected some breaking changes, the fix is likely would make it only to the next 6.0 version which GA is only due this fall.

As a workaround consider to do such an MQTT logic manually:

  1. Use subscribe() in your code and call some @MessagingGateway from the listener callback.
  2. The publish() could be used from any POJO method which then can be just referenced from the service activator.

oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 21, 2022
Fixes spring-projects#3685

Introduce some initial design.
Add a new interface `MqttClientManager` which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 26, 2022
Fixes spring-projects#3685

Introduce some initial design.
Add a new interface `ClientManager` which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 27, 2022
Fixes spring-projects#3685

Add a new interface `ClientManager` which will manage clients and
connections.
Add different implementations for v3 and v5 MQTT clients.
Use this manager in v3/v5 topic adapters and message handlers.
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 28, 2022
Fixes spring-projects#3685

Add a couple of unit/integration tests to cover client manager usage.
Several small code improvements after the code review:
* Improve client manager usage via providing several mutual exclusive
constructors, whether the users provides `url` or `connectionOptions`
or `clientFactory` for v3.
* Move the logger to `AbstractMqttClientManager`
* Do not inject TaskScheduler in constructor for v3 client manager
but use lazy init via `BeanFactory` and `IntegrationContextUtils`
* Other smaller code readability improvements
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 29, 2022
Fixes spring-projects#3685

Introduce some initial design.
Add a new interface `ClientManager` which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 29, 2022
Fixes spring-projects#3685

Add a new interface `ClientManager` which will manage clients and
connections.
Add different implementations for v3 and v5 MQTT clients.
Use this manager in v3/v5 topic adapters and message handlers.
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 29, 2022
Fixes spring-projects#3685

Add a couple of unit/integration tests to cover client manager usage.
Several small code improvements after the code review:
* Improve client manager usage via providing several mutual exclusive
constructors, whether the users provides `url` or `connectionOptions`
or `clientFactory` for v3.
* Move the logger to `AbstractMqttClientManager`
* Do not inject TaskScheduler in constructor for v3 client manager
but use lazy init via `BeanFactory` and `IntegrationContextUtils`
* Other smaller code readability improvements
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 29, 2022
Fixes spring-projects#3685

Add new tests with reconnect cases.
Other code improvements after the code review:
* Adjust javadocs according to standards
* Remove `setClientManager` and use exclusive ctors
* Make automatic reconnects using the v3 client instead of manually
using task scheduler
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Jul 31, 2022
Fixes spring-projects#3685

Some fixes and improvements after another code review iteration:
* Rearrange the code according to the code style guides
* Move client instance to `AbstractClientManager` with `isRunning`
method
* Fix abstract adapter/handler fields visibility and `final`ize them
where we can
* Send application event if automatic reconnect is not enabled for the
client manager
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Aug 6, 2022
Fixes spring-projects#3685

Other fixes and improvements after code review:
* Changes around fields, methods, ctors visibility
* Removed contradictory ctors
* Reduce amount of unnecessary `getClientManager() != null` checks
in logic and make it as similar as possible for client manager and the
old approach
* Use auto-reconnect where possible
* Remove manual reconnect trigger and rely on events instead to know
where to subscribe
* Do not close the connection in adapter to be able to use reconnect
logic without lose of subscriptions
* Make `ClientManager` extend `MqttComponent` so that it knows about
connection options as part of its contract
* Remove not relevant auto test cases (relying on connection close or
manual reconnect)
* Other code style smaller changes
oxcafedead added a commit to oxcafedead/spring-integration that referenced this issue Aug 9, 2022
Fixes spring-projects#3685

Other fixes and improvements after code review:
* Get manual `reconnect` invocation back for v3/v5 adapters and client
managers (see bug spring-projectsGH-3822 for a reasoning)
* Remove unnecessary getters/setter for a listener and use adapter
class as listener instead
* Optimize MessageListener: remove redundant inner class and use a
single method reference instead of N instances per each subscribe
* Javadocs improvements
@artembilan artembilan modified the milestones: 6.0.x, 6.0.0-M5 Aug 11, 2022
artembilan pushed a commit that referenced this issue Aug 16, 2022
Related to #3685

* Add documentation for a new MQTT shared client feature

Add an overview with reason for the feature as well as basic
capabilities listing. Give an example with Java DSL usage for several
adapters.

* Fill `whats-new.adoc` with MQTT changes

Add a reference to MQTT documentation with info about shared MQTT client

* Couple of code review changes
@Yuesheng321
Copy link

Hello, How can I solve it in spring-integration 5.5.14?

@artembilan
Copy link
Member

In that version you can do that only implementing your own channel adapters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants