Skip to content

Commit 4932452

Browse files
committed
Initialize ping with connect flush time so the first ping is sent earlier
1 parent d31a4d2 commit 4932452

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnectHandler.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class MqttConnectHandler extends MqttTimeoutInboundHandler {
7676
private final @NotNull MqttDisconnectOnConnAckHandler disconnectOnConnAckHandler;
7777

7878
private boolean connectCalled = false;
79+
private long connectFlushTime;
7980

8081
@Inject
8182
MqttConnectHandler(
@@ -122,6 +123,7 @@ public void handlerAdded(final @NotNull ChannelHandlerContext ctx) {
122123
* @param ctx the channel handler context.
123124
*/
124125
private void writeConnect(final @NotNull ChannelHandlerContext ctx) {
126+
connectFlushTime = System.nanoTime();
125127
ctx.writeAndFlush((connect.getRawEnhancedAuthMechanism() == null) ?
126128
connect.createStateful(clientConfig.getRawClientIdentifier(), null) : connect).addListener(this);
127129
}
@@ -163,8 +165,7 @@ public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull
163165
*/
164166
private void readConnAck(final @NotNull MqttConnAck connAck, final @NotNull Channel channel) {
165167
if (connAck.getReasonCode().isError()) {
166-
MqttDisconnectUtil.close(channel, new Mqtt5ConnAckException(
167-
connAck,
168+
MqttDisconnectUtil.close(channel, new Mqtt5ConnAckException(connAck,
168169
"CONNECT failed as CONNACK contained an Error Code: " + connAck.getReasonCode() + "."));
169170

170171
} else if (validateClientIdentifier(connAck, channel)) {
@@ -178,7 +179,8 @@ private void readConnAck(final @NotNull MqttConnAck connAck, final @NotNull Chan
178179

179180
final int keepAlive = connectionConfig.getKeepAlive();
180181
if (keepAlive > 0) {
181-
channel.pipeline().addAfter(MqttDecoder.NAME, MqttPingHandler.NAME, new MqttPingHandler(keepAlive));
182+
final MqttPingHandler pingHandler = new MqttPingHandler(keepAlive, connectFlushTime, System.nanoTime());
183+
channel.pipeline().addAfter(MqttDecoder.NAME, MqttPingHandler.NAME, pingHandler);
182184
}
183185

184186
clientConfig.getRawState().set(MqttClientState.CONNECTED);

src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,16 @@ public class MqttPingHandler extends MqttConnectionAwareHandler
5757
private boolean messageRead;
5858
private @Nullable ScheduledFuture<?> timeoutFuture;
5959

60-
public MqttPingHandler(final int keepAlive) {
60+
public MqttPingHandler(final int keepAlive, final long lastFlushTimeNanos, final long lastReadTimeNanos) {
6161
keepAliveNanos = TimeUnit.SECONDS.toNanos(keepAlive) - TimeUnit.MILLISECONDS.toNanos(100);
62+
this.lastFlushTimeNanos = lastFlushTimeNanos;
63+
this.lastReadTimeNanos = lastReadTimeNanos;
6264
}
6365

6466
@Override
6567
public void handlerAdded(final @NotNull ChannelHandlerContext ctx) {
6668
super.handlerAdded(ctx);
67-
lastFlushTimeNanos = lastReadTimeNanos = System.nanoTime();
68-
schedule(ctx, keepAliveNanos);
69+
schedule(ctx, nextDelay(System.nanoTime()));
6970
}
7071

7172
@Override
@@ -89,6 +90,10 @@ private void schedule(final @NotNull ChannelHandlerContext ctx, final long delay
8990
timeoutFuture = ctx.executor().schedule(this, delayNanos, TimeUnit.NANOSECONDS);
9091
}
9192

93+
private long nextDelay(final long timeNanos) {
94+
return keepAliveNanos - (timeNanos - Math.min(lastReadTimeNanos, lastFlushTimeNanos));
95+
}
96+
9297
@Override
9398
public void run() {
9499
if (ctx == null) {
@@ -107,7 +112,7 @@ public void run() {
107112
pingReqFlushed = false;
108113
messageRead = false;
109114
final long timeNanos = System.nanoTime();
110-
final long nextDelayNanos = keepAliveNanos - (timeNanos - Math.min(lastReadTimeNanos, lastFlushTimeNanos));
115+
final long nextDelayNanos = nextDelay(timeNanos);
111116
if (nextDelayNanos > 1_000) {
112117
pingReqWritten = false;
113118
schedule(ctx, nextDelayNanos);

0 commit comments

Comments
 (0)