Skip to content

GH-3732 make mqttClient initialize lazily #3747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

mipo256
Copy link

@mipo256 mipo256 commented Mar 14, 2022

Fixing #3732 issue. Also simplified the Topic#equals() method.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, find some review on the lines.
Thank you so far!

@@ -140,7 +141,7 @@ protected MqttMessageConverter getConverter() {
}

@ManagedAttribute
public String[] getTopic() {
public String[] getTopicNames() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you rename the method?

I might be agree that your one has more sense, but for consistency with others we need to keep the old name.
See addTopic(), removeTopic().

Even if we call it getTopics(), we still need to keep the old one for backward compatibility.
Even if we deprecate it.

I mean the bad method name is not a strong justification to introduce a braking change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, you are right. I will leave the name of the method unchanged, thanks

try {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
}
catch (MqttException ex) {
} catch (MqttException ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not correct code style.

Please, run ./gradlew clean :spring-integration-mqtt:check to see all the Checkstyle violations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sure, I am sorry for violating the project code style, I will polish the PR before merge. Thanks!

@@ -187,33 +190,40 @@ protected void doStart() {
@Override
protected void doStop() {
this.topicLock.lock();
String[] topics = getTopic();
initializeMqttAsyncClientIfRequired();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's something wrong in the stop() logic.
if there is not client initialized why would one wants to initialize it form stopping?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, in general. There is clearly, from a logical perspective no need to initialize mqttClient in case we are done with this ChannelAdapter, so I agree.

But we still need to avoid NullPointerException, right? Falling with this exception will not be good, so maybe we can end up with the following scenario: we just check, whether mqttClient is not null, and if so, we will apply disconnection logic, otherwise we will just do nothing

logger.error(ex, "Failed to close 'MqttAsyncClient'");
}
}

@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
initializeMqttAsyncClientIfRequired();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not OK, too.
The addTopic() could be called from the Spring context initialization phase. Therefore it is too aggressive to make a low-level resource connection.

I believe the logic here must be similar to the one in the MqttPahoMessageDrivenChannelAdapter:

if (this.client != null && this.client.isConnected()) {
		this.client.subscribe(topic, qos);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I got the idea. We will just subscribe to the passed topic with passed QoS in case the mqttClient have being initialized, otherwise we will just maybe print a warning, just to notify the user.

@artembilan I think we should also omit the super.addTopic(topic, qos) invocation in the case of mqttClient have not been initialized, in order to not to get in trouble to return with getTopic() the name of the topic onto which we did not subscribed really.

Copy link
Author

@mipo256 mipo256 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan I have adjusted the PR after your comments, can you, please, take a look again?

@@ -187,33 +190,40 @@ protected void doStart() {
@Override
protected void doStop() {
this.topicLock.lock();
String[] topics = getTopic();
initializeMqttAsyncClientIfRequired();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, in general. There is clearly, from a logical perspective no need to initialize mqttClient in case we are done with this ChannelAdapter, so I agree.

But we still need to avoid NullPointerException, right? Falling with this exception will not be good, so maybe we can end up with the following scenario: we just check, whether mqttClient is not null, and if so, we will apply disconnection logic, otherwise we will just do nothing

try {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
}
catch (MqttException ex) {
} catch (MqttException ex) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sure, I am sorry for violating the project code style, I will polish the PR before merge. Thanks!

logger.error(ex, "Failed to close 'MqttAsyncClient'");
}
}

@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
initializeMqttAsyncClientIfRequired();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I got the idea. We will just subscribe to the passed topic with passed QoS in case the mqttClient have being initialized, otherwise we will just maybe print a warning, just to notify the user.

@artembilan I think we should also omit the super.addTopic(topic, qos) invocation in the case of mqttClient have not been initialized, in order to not to get in trouble to return with getTopic() the name of the topic onto which we did not subscribed really.

@mipo256 mipo256 requested a review from artembilan March 15, 2022 20:52
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also would like to see some simple unit test to verify the we don't have a reported in the issue NPE any more.

Thanks

this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
super.addTopic(topic, qos);
} else {
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected, the topic will not be added!", topic, qos));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is so bad to store topics (super.addTopic(topic, qos);) for the future use.
Something similar what we have so far in the MqttPahoMessageDrivenChannelAdapter.
So, please, change the logic over here to the old one and subscribe only if there is an mqttClient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, ok, perhaps) I just retained a warning in case mqttClient is not initialized

}
}

private void initializeMqttAsyncClient() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method becomes and obsolete right now, so better to restore the old code of the onInit()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I have removed it

@mipo256
Copy link
Author

mipo256 commented Mar 17, 2022

@artembilan I have supplied the RP with 2 simple unit tests and squashed commits into one, can you please review?

Thanks

@artembilan
Copy link
Member

squashed commits into one

That was not good, since now I have to review all your changes.
While without squashing there would be only need to take a look into fresh commits.

Not problem for now: jut want to give you a hint how it would be easier to collaborate in the future. 😄
We squash it in the end when we merge it upstream.

this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
} else {
logger.warn("An attempt to stop mqtt client without having one initialized or connected");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still want you to remove this warn.

The point is that component could not be started at all from the beginning for some reasons, but stop() is called when we close our application context.
Or the stop might be called via some centralized logic.
And the main feature of the stop is to be idempotent: nothing to stop - just exit silently!
So, such a warning would be a noise in logs in some target applications.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that make sense, I will remove this warning, thank you!

if (mqttClient != null) {
this.mqttClient.close(true);
}
} catch (MqttException ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, run gradle :spring-integration-mqtt:check to revise your Checkstyle violations and fix them, please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have fixed all of the Checkstyle violations

if (mqttClient != null && mqttClient.isConnected()) {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
} else {
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected", topic, qos));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one has to be removed, too.
It is fully OK to add topics which could be subscribed later on, when start() is called.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixed

if (mqttClient != null && mqttClient.isConnected()) {
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
} else {
logger.warn(String.format("An attempt to remove topics : '%s' without having mqtt client initialized or connected", Arrays.toString(topic)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixed

class Mqttv5PahoMessageDrivenChannelAdapterTest {

@Test //GH-3732
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just add these new tests into an existing Mqttv5BackToBackTests?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not noticed this test class, I will move the test methods into Mqttv5BackToBackTests

Copy link
Author

@mipo256 mipo256 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed all of your suggestions and verified the code with Checkstyle, sorry for being annoying :)

this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
} else {
logger.warn("An attempt to stop mqtt client without having one initialized or connected");
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that make sense, I will remove this warning, thank you!

class Mqttv5PahoMessageDrivenChannelAdapterTest {

@Test //GH-3732
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not noticed this test class, I will move the test methods into Mqttv5BackToBackTests

if (mqttClient != null) {
this.mqttClient.close(true);
}
} catch (MqttException ex) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have fixed all of the Checkstyle violations

if (mqttClient != null && mqttClient.isConnected()) {
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
} else {
logger.warn(String.format("An attempt to remove topics : '%s' without having mqtt client initialized or connected", Arrays.toString(topic)));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixed

if (mqttClient != null && mqttClient.isConnected()) {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
} else {
logger.warn(String.format("An attempt to add topic : '%s' with QoS : '%s' without having mqtt client initialized or connected", topic, qos));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixed

@mipo256 mipo256 requested a review from artembilan March 18, 2022 15:07
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulling locally for the final review and possible merge...

@artembilan
Copy link
Member

Merged and cherry-picked to 5.5.x as c1d29ab after some minor code clean up.

@mikhail2048 ,

thank you very much for the contribution; looking forward for more!

@artembilan artembilan closed this Mar 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants