Skip to content

Mqttv5PahoMessageDrivenChannelAdapter.addTopic(topic, qos) throws NPE #3732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mrpiggi opened this issue Feb 25, 2022 · 9 comments
Closed

Mqttv5PahoMessageDrivenChannelAdapter.addTopic(topic, qos) throws NPE #3732

mrpiggi opened this issue Feb 25, 2022 · 9 comments
Labels
ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. in: mqtt type: bug
Milestone

Comments

@mrpiggi
Copy link
Contributor

mrpiggi commented Feb 25, 2022

In what version(s) of Spring Integration are you seeing this issue?

5.5.9 and prior (5.5.5)

Describe the bug

Creating a new Mqttv5PahoMessageDrivenChannelAdapter and using addTopic(topic, qos) before onInit() was called throws a NPE as this.mqttClient.subscribe(topic, qos) is invoked. Same with Mqttv5PahoMessageDrivenChannelAdapter.removeTopic(String...) and this.mqttClient.unsubscribe(topic).

To Reproduce

final Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id);
adapter.addTopic(topic, 1);

Workaround

final Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
adapter.setQos(1);

Expected behavior

Topic registration or removal without calling IMqttAsyncClient.subscribe(topic, qos) or IMqttAsyncClient.unsubscribe(String...) when not initialized yet.

The comparison between Mqttv5PahoMessageDrivenChannelAdapter and MqttPahoMessageDrivenChannelAdapter is quite obvious.

@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
try {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
super.addTopic(topic, qos);
}
catch (MqttException ex) {
throw new MessagingException("Failed to subscribe to topic " + topic, ex);
}
finally {
this.topicLock.unlock();
}
}
@Override
public void removeTopic(String... topic) {
this.topicLock.lock();
try {
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
super.removeTopic(topic);
}
catch (MqttException ex) {
throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), ex);
}
finally {
this.topicLock.unlock();
}
}

@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
try {
super.addTopic(topic, qos);
if (this.client != null && this.client.isConnected()) {
this.client.subscribe(topic, qos);
}
}
catch (MqttException e) {
super.removeTopic(topic);
throw new MessagingException("Failed to subscribe to topic " + topic, e);
}
finally {
this.topicLock.unlock();
}
}
@Override
public void removeTopic(String... topic) {
this.topicLock.lock();
try {
if (this.client != null && this.client.isConnected()) {
this.client.unsubscribe(topic);
}
super.removeTopic(topic);
}
catch (MqttException e) {
throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), e);
}
finally {
this.topicLock.unlock();
}
}

@mrpiggi mrpiggi added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Feb 25, 2022
@artembilan artembilan added in: mqtt backport 5.5.x and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Feb 28, 2022
@artembilan artembilan added this to the 6.0 M2 milestone Feb 28, 2022
@artembilan
Copy link
Member

Confirmed.

Feel free to contribute the fix: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc

Thank you!

@artembilan artembilan added the ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. label Mar 1, 2022
mipo256 pushed a commit to mipo256/spring-integration that referenced this issue Mar 14, 2022
mipo256 pushed a commit to mipo256/spring-integration that referenced this issue Mar 14, 2022
mipo256 added a commit to mipo256/spring-integration that referenced this issue Mar 15, 2022
mipo256 added a commit to mipo256/spring-integration that referenced this issue Mar 17, 2022
mipo256 pushed a commit to mipo256/spring-integration that referenced this issue Mar 17, 2022
mipo256 pushed a commit to mipo256/spring-integration that referenced this issue Mar 18, 2022
artembilan pushed a commit that referenced this issue Mar 18, 2022
Fixes #3732

The `Mqttv5PahoMessageDrivenChannelAdapter` unconditionally tries to (un)subscribe
to/from topics when the `mqqtClient` might not be initialized yet.

* Add `mqqtClient` initialization check before adding or removing topic.
Re-align logic with the `MqttPahoMessageDrivenChannelAdapter`

**Cherry-pick to `5.5.x`**
@mrpiggi
Copy link
Contributor Author

mrpiggi commented Apr 6, 2022

I know I'm way too late to the party, but comparing the implementations of MqttPahoMessageDrivenChannelAdapter and Mqttv5PahoMessageDrivenChannelAdapter raises a question: Should topics that throw a MqttException when using mqttClient.subscribe() or mqttClient.unsubscribe() live on in AbstractMqttMessageDrivenChannelAdapter or not? Currently, the implementations are not consistent in this respect.

@artembilan Do you have an opinion on this? If so, I would provide a corresponding PR.

@artembilan
Copy link
Member

@mrpiggi ,

I'm not sure in your question, but MqttPahoMessageDrivenChannelAdapter and Mqttv5PahoMessageDrivenChannelAdapter are different with the matter that they use different mqttClient libraries.
We just cannot pull that logic to the super class.

Would you mind to elaborate more what you see could be so generic that it can be pulled to the AbstractMqttMessageDrivenChannelAdapter?

Thanks

@mrpiggi
Copy link
Contributor Author

mrpiggi commented Apr 7, 2022

@artembilan Sorry for being a bit lazy and unspecific.

Regarding addTopic(String topic, int qos), MqttPahoMessageDrivenChannelAdapter does:

@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
try {
super.addTopic(topic, qos);
if (this.client != null && this.client.isConnected()) {
this.client.subscribe(topic, qos);
}
}
catch (MqttException e) {
super.removeTopic(topic);
throw new MessagingException("Failed to subscribe to topic " + topic, e);
}
finally {
this.topicLock.unlock();
}
}

whereas Mqttv5PahoMessageDrivenChannelAdapter does:
@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
try {
super.addTopic(topic, qos);
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
}
}
catch (MqttException ex) {
throw new MessagingException("Failed to subscribe to topic " + topic, ex);
}
finally {
this.topicLock.unlock();
}
}

In case of an exception, super.removeTopic() is only invoked by MqttPahoMessageDrivenChannelAdapter but not Mqttv5PahoMessageDrivenChannelAdapter, so for the latter, the topic will remain in Set<Topic> topics within AbstractMqttMessageDrivenChannelAdapter and therefore reflected by getTopic() and getQos().

For removeTopic(String... topic) both adapters call super.removeTopic(topic) after client.unsubscribe(topic), so in case of an exception, these topics will remain in Set<Topic> topics. I did not dig into MqttAsyncClient so I am not sure, if maybe removing the given topics from Set<Topic> topics in any case would be sensible.

Additionally, AbstractMqttMessageDrivenChannelAdapter.addTopic(String... topic) and AbstractMqttMessageDrivenChannelAdapter.addTopic(String[] topic, int[] qos) are not overridden by both MqttPahoMessageDrivenChannelAdapter and Mqttv5PahoMessageDrivenChannelAdapter so calling these methods after initialization won't induce a subscription to the client at all.

@artembilan
Copy link
Member

@mrpiggi ,

great observation and investigation!

I'd be glad to see from you a PR for review.

Thanks

@mrpiggi
Copy link
Contributor Author

mrpiggi commented Apr 22, 2022

My analysis was incomplete. As AbstractMqttMessageDrivenChannelAdapter.addTopic(String... topic) and AbstractMqttMessageDrivenChannelAdapter.addTopic(String[] topic, int[] qos) both call AbstractMqttMessageDrivenChannelAdapter.addTopic(String topic, int qos) and this method is overridden, this aspect is fine.

So, only the question whether to call AbstractMqttMessageDrivenChannelAdapter.removeTopic(String... topic) or not on an Exception is open. Adding topics to both adapters before initialization will be always be reflected by Set<Topic> topics in every case and topics in general are never removed automatically, deleting

would probably be the easiest way to retain consistency. Probably not worth a separate PR.

@artembilan
Copy link
Member

Why not vise-versa and add super.removeTopic(topic); to the addTopic() of the Mqttv5PahoMessageDrivenChannelAdapter?
I think this would be safer since we know that topic is failed somehow.

I doesn't look, though, that we can track failed topic somehow from the subscribe(String[] topicFilters, int[] qos) call to be able to remove it afterwards...

@mrpiggi
Copy link
Contributor Author

mrpiggi commented Apr 25, 2022

My intention was to get consistent behavior regardless of whether the adapter has already been initialized or not. If addTopic(String topic, int qos) is used before initialization, the topic is added to Set<Topic> topics in any case and remains in it when doStart() is called, even in case of an exception.

There is also the question of what could actually lead to a MqttException. There is the possibility to query the cause with MqttException.getReasonCode(), but the documentation in org.eclipse.paho for this is rather rudimentary. As far as I can see, a MqttException is only thrown in case of connection problems, which in turn means that when a connection is re-established, it should be possible to retry a subscription for a previously registered topic. Incidentally, an incorrectly formatted topic throws an IllegalArgumentException and thus always remains registered in Set<String> topics.

To work around this, one possibility would be to define a boolean field subscribed within AbstractMqttMessageDrivenChannelAdapter.Topic and set this to true if the call to client.subscribe() does not throw an exception.

@artembilan
Copy link
Member

Good. If you have something in mind and can come up with some unit test to verify that behavior, I won't mind to review your contribution.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. in: mqtt type: bug
Projects
None yet
Development

No branches or pull requests

2 participants