From 4b1687c3b49798cd6c36d37982cf489ca0dccfb3 Mon Sep 17 00:00:00 2001 From: Silvio Giebl Date: Sun, 15 Dec 2019 15:04:05 +0100 Subject: [PATCH 1/2] Also send ping when no message has been read for the keep alive time --- .../internal/mqtt/handler/ping/MqttPingHandler.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java index 0633ec2c4..8b3f0fce5 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java @@ -51,6 +51,7 @@ public class MqttPingHandler extends MqttConnectionAwareHandler private final long keepAliveNanos; private long lastFlushTimeNanos; + private long lastReadTimeNanos; private boolean pingReqWritten; private boolean pingReqFlushed; private boolean messageRead; @@ -63,7 +64,7 @@ public MqttPingHandler(final int keepAlive) { @Override public void handlerAdded(final @NotNull ChannelHandlerContext ctx) { super.handlerAdded(ctx); - lastFlushTimeNanos = System.nanoTime(); + lastFlushTimeNanos = lastReadTimeNanos = System.nanoTime(); schedule(ctx, keepAliveNanos); } @@ -75,6 +76,7 @@ public void flush(final @NotNull ChannelHandlerContext ctx) { @Override public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull Object msg) { + lastReadTimeNanos = System.nanoTime(); if (msg instanceof MqttPingResp) { messageRead = true; } else { @@ -104,13 +106,15 @@ public void run() { } pingReqFlushed = false; messageRead = false; - final long nextDelayNanos = keepAliveNanos - (System.nanoTime() - lastFlushTimeNanos); + final long timeNanos = System.nanoTime(); + final long nextDelayNanos = keepAliveNanos - (timeNanos - Math.min(lastReadTimeNanos, lastFlushTimeNanos)); if (nextDelayNanos > 1_000) { pingReqWritten = false; schedule(ctx, nextDelayNanos); } else { pingReqWritten = true; schedule(ctx, keepAliveNanos); + lastFlushTimeNanos = timeNanos; ctx.writeAndFlush(MqttPingReq.INSTANCE).addListener(this); } } From 6ab0e17655c39065a959055d3d3539cac245f264 Mon Sep 17 00:00:00 2001 From: Silvio Giebl Date: Fri, 27 Dec 2019 08:34:28 +0100 Subject: [PATCH 2/2] Initialize ping with connect flush time so the first ping is sent earlier --- .../mqtt/handler/connect/MqttConnectHandler.java | 5 ++++- .../internal/mqtt/handler/ping/MqttPingHandler.java | 13 +++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnectHandler.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnectHandler.java index e81733f80..fad34a911 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnectHandler.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnectHandler.java @@ -76,6 +76,7 @@ public class MqttConnectHandler extends MqttTimeoutInboundHandler { private final @NotNull MqttDisconnectOnConnAckHandler disconnectOnConnAckHandler; private boolean connectCalled = false; + private long connectFlushTime; @Inject MqttConnectHandler( @@ -122,6 +123,7 @@ public void handlerAdded(final @NotNull ChannelHandlerContext ctx) { * @param ctx the channel handler context. */ private void writeConnect(final @NotNull ChannelHandlerContext ctx) { + connectFlushTime = System.nanoTime(); ctx.writeAndFlush((connect.getRawEnhancedAuthMechanism() == null) ? connect.createStateful(clientConfig.getRawClientIdentifier(), null) : connect).addListener(this); } @@ -173,7 +175,8 @@ private void readConnAck(final @NotNull MqttConnAck connAck, final @NotNull Chan final int keepAlive = connectionConfig.getKeepAlive(); if (keepAlive > 0) { - channel.pipeline().addAfter(MqttDecoder.NAME, MqttPingHandler.NAME, new MqttPingHandler(keepAlive)); + final MqttPingHandler pingHandler = new MqttPingHandler(keepAlive, connectFlushTime, System.nanoTime()); + channel.pipeline().addAfter(MqttDecoder.NAME, MqttPingHandler.NAME, pingHandler); } clientConfig.getRawState().set(MqttClientState.CONNECTED); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java index 8b3f0fce5..509824adb 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/ping/MqttPingHandler.java @@ -57,15 +57,16 @@ public class MqttPingHandler extends MqttConnectionAwareHandler private boolean messageRead; private @Nullable ScheduledFuture timeoutFuture; - public MqttPingHandler(final int keepAlive) { + public MqttPingHandler(final int keepAlive, final long lastFlushTimeNanos, final long lastReadTimeNanos) { keepAliveNanos = TimeUnit.SECONDS.toNanos(keepAlive) - TimeUnit.MILLISECONDS.toNanos(100); + this.lastFlushTimeNanos = lastFlushTimeNanos; + this.lastReadTimeNanos = lastReadTimeNanos; } @Override public void handlerAdded(final @NotNull ChannelHandlerContext ctx) { super.handlerAdded(ctx); - lastFlushTimeNanos = lastReadTimeNanos = System.nanoTime(); - schedule(ctx, keepAliveNanos); + schedule(ctx, nextDelay(System.nanoTime())); } @Override @@ -89,6 +90,10 @@ private void schedule(final @NotNull ChannelHandlerContext ctx, final long delay timeoutFuture = ctx.executor().schedule(this, delayNanos, TimeUnit.NANOSECONDS); } + private long nextDelay(final long timeNanos) { + return keepAliveNanos - (timeNanos - Math.min(lastReadTimeNanos, lastFlushTimeNanos)); + } + @Override public void run() { if (ctx == null) { @@ -107,7 +112,7 @@ public void run() { pingReqFlushed = false; messageRead = false; final long timeNanos = System.nanoTime(); - final long nextDelayNanos = keepAliveNanos - (timeNanos - Math.min(lastReadTimeNanos, lastFlushTimeNanos)); + final long nextDelayNanos = nextDelay(timeNanos); if (nextDelayNanos > 1_000) { pingReqWritten = false; schedule(ctx, nextDelayNanos);