@@ -57,15 +57,16 @@ public class MqttPingHandler extends MqttConnectionAwareHandler
57
57
private boolean messageRead ;
58
58
private @ Nullable ScheduledFuture <?> timeoutFuture ;
59
59
60
- public MqttPingHandler (final int keepAlive ) {
60
+ public MqttPingHandler (final int keepAlive , final long lastFlushTimeNanos , final long lastReadTimeNanos ) {
61
61
keepAliveNanos = TimeUnit .SECONDS .toNanos (keepAlive ) - TimeUnit .MILLISECONDS .toNanos (100 );
62
+ this .lastFlushTimeNanos = lastFlushTimeNanos ;
63
+ this .lastReadTimeNanos = lastReadTimeNanos ;
62
64
}
63
65
64
66
@ Override
65
67
public void handlerAdded (final @ NotNull ChannelHandlerContext ctx ) {
66
68
super .handlerAdded (ctx );
67
- lastFlushTimeNanos = lastReadTimeNanos = System .nanoTime ();
68
- schedule (ctx , keepAliveNanos );
69
+ schedule (ctx , nextDelay (System .nanoTime ()));
69
70
}
70
71
71
72
@ Override
@@ -89,6 +90,10 @@ private void schedule(final @NotNull ChannelHandlerContext ctx, final long delay
89
90
timeoutFuture = ctx .executor ().schedule (this , delayNanos , TimeUnit .NANOSECONDS );
90
91
}
91
92
93
+ private long nextDelay (final long timeNanos ) {
94
+ return keepAliveNanos - (timeNanos - Math .min (lastReadTimeNanos , lastFlushTimeNanos ));
95
+ }
96
+
92
97
@ Override
93
98
public void run () {
94
99
if (ctx == null ) {
@@ -107,7 +112,7 @@ public void run() {
107
112
pingReqFlushed = false ;
108
113
messageRead = false ;
109
114
final long timeNanos = System .nanoTime ();
110
- final long nextDelayNanos = keepAliveNanos - (timeNanos - Math . min ( lastReadTimeNanos , lastFlushTimeNanos ) );
115
+ final long nextDelayNanos = nextDelay (timeNanos );
111
116
if (nextDelayNanos > 1_000 ) {
112
117
pingReqWritten = false ;
113
118
schedule (ctx , nextDelayNanos );
0 commit comments