Skip to content

Commit bad160a

Browse files
authored
feat: keepalive manager (#1865)
* fix: keepalive management * fix: skip shift on 'publish' packets * fix: better keepalive checks * fix: refactor methods * fix: example * refactor: rename method * fix: keepalive tests * fix: remove .only * fix: connack * fix: log mock * fix: do not allow setting 0 as keepalive in manager * fix: remove useless rescheduling on connack * fix: flaky test * fix: add math.ceil to keepalive interval every
1 parent ea4ec78 commit bad160a

12 files changed

+331
-271
lines changed

DEVELOPMENT.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ npm test
1919

2020
This will run both `browser` and `node` tests.
2121

22-
2322
### Running specific tests
2423

25-
For example, you can run `node -r esbuild-register --test test/pingTimer.ts`
24+
For example, you can run `node -r esbuild-register --test test/keepaliveManager.ts`
2625

2726
### Browser
2827

example.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import mqtt from './src/index'
22

3-
const client = mqtt.connect('mqtts://test.mosquitto.org', {
4-
keepalive: 10,
5-
port: 8883,
3+
const client = mqtt.connect('mqtt://broker.hivemq.com', {
4+
keepalive: 3,
5+
port: 1883,
66
reconnectPeriod: 15000,
77
rejectUnauthorized: false,
88
})

src/lib/KeepaliveManager.ts

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import type MqttClient from './client'
2+
import getTimer, { type Timer } from './get-timer'
3+
import type { TimerVariant } from './shared'
4+
5+
export default class KeepaliveManager {
6+
private _keepalive: number
7+
8+
private timerId: number
9+
10+
private timer: Timer
11+
12+
private destroyed = false
13+
14+
private counter: number
15+
16+
private client: MqttClient
17+
18+
private _keepaliveTimeoutTimestamp: number
19+
20+
private _intervalEvery: number
21+
22+
/** Timestamp of next keepalive timeout */
23+
get keepaliveTimeoutTimestamp() {
24+
return this._keepaliveTimeoutTimestamp
25+
}
26+
27+
/** Milliseconds of the actual interval */
28+
get intervalEvery() {
29+
return this._intervalEvery
30+
}
31+
32+
get keepalive() {
33+
return this._keepalive
34+
}
35+
36+
constructor(client: MqttClient, variant: TimerVariant) {
37+
this.client = client
38+
this.timer = getTimer(variant)
39+
this.setKeepalive(client.options.keepalive)
40+
}
41+
42+
private clear() {
43+
if (this.timerId) {
44+
this.timer.clear(this.timerId)
45+
this.timerId = null
46+
}
47+
}
48+
49+
/** Change the keepalive */
50+
setKeepalive(value: number) {
51+
// keepalive is in seconds
52+
value *= 1000
53+
54+
if (
55+
// eslint-disable-next-line no-restricted-globals
56+
isNaN(value) ||
57+
value <= 0 ||
58+
value > 2147483647
59+
) {
60+
throw new Error(
61+
`Keepalive value must be an integer between 0 and 2147483647. Provided value is ${value}`,
62+
)
63+
}
64+
65+
this._keepalive = value
66+
67+
this.reschedule()
68+
69+
this.client['log'](`KeepaliveManager: set keepalive to ${value}ms`)
70+
}
71+
72+
destroy() {
73+
this.clear()
74+
this.destroyed = true
75+
}
76+
77+
reschedule() {
78+
if (this.destroyed) {
79+
return
80+
}
81+
82+
this.clear()
83+
this.counter = 0
84+
85+
// https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.5_Keep
86+
const keepAliveTimeout = Math.ceil(this._keepalive * 1.5)
87+
88+
this._keepaliveTimeoutTimestamp = Date.now() + keepAliveTimeout
89+
this._intervalEvery = Math.ceil(this._keepalive / 2)
90+
91+
this.timerId = this.timer.set(() => {
92+
// this should never happen, but just in case
93+
if (this.destroyed) {
94+
return
95+
}
96+
97+
this.counter += 1
98+
99+
// after keepalive seconds, send a pingreq
100+
if (this.counter === 2) {
101+
this.client.sendPing()
102+
} else if (this.counter > 2) {
103+
this.client.onKeepaliveTimeout()
104+
}
105+
}, this._intervalEvery)
106+
}
107+
}

src/lib/PingTimer.ts

-56
This file was deleted.

src/lib/client.ts

+29-55
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import {
3939
} from './shared'
4040
import TopicAliasSend from './topic-alias-send'
4141
import { TypedEventEmitter } from './TypedEmitter'
42-
import PingTimer from './PingTimer'
42+
import KeepaliveManager from './KeepaliveManager'
4343
import isBrowser, { isWebWorker } from './is-browser'
4444

4545
const setImmediate =
@@ -433,10 +433,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
433433

434434
public noop: (error?: any) => void
435435

436-
/** Timestamp of last received control packet */
437-
public pingResp: number
438-
439-
public pingTimer: PingTimer
436+
public keepaliveManager: KeepaliveManager
440437

441438
/**
442439
* The connection to the Broker. In browsers env this also have `socket` property
@@ -572,8 +569,8 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
572569
// map of a subscribe messageId and a topic
573570
this.messageIdToTopic = {}
574571

575-
// Ping timer, setup in _setupPingTimer
576-
this.pingTimer = null
572+
// Keepalive manager, setup in _setupKeepaliveManager
573+
this.keepaliveManager = null
577574
// Is the client connected?
578575
this.connected = false
579576
// Are we disconnecting?
@@ -660,7 +657,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
660657
this.log('close :: clearing connackTimer')
661658
clearTimeout(this.connackTimer)
662659

663-
this._destroyPingTimer()
660+
this._destroyKeepaliveManager()
664661

665662
if (this.topicAliasRecv) {
666663
this.topicAliasRecv.clear()
@@ -1780,7 +1777,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
17801777
this._setupReconnect()
17811778
}
17821779

1783-
this._destroyPingTimer()
1780+
this._destroyKeepaliveManager()
17841781

17851782
if (done && !this.connected) {
17861783
this.log(
@@ -2064,45 +2061,36 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
20642061
}
20652062

20662063
/**
2067-
* _setupPingTimer - setup the ping timer
2068-
*
2069-
* @api private
2064+
* _setupKeepaliveManager - setup the keepalive manager
20702065
*/
2071-
private _setupPingTimer() {
2066+
private _setupKeepaliveManager() {
20722067
this.log(
2073-
'_setupPingTimer :: keepalive %d (seconds)',
2068+
'_setupKeepaliveManager :: keepalive %d (seconds)',
20742069
this.options.keepalive,
20752070
)
20762071

2077-
if (!this.pingTimer && this.options.keepalive) {
2078-
this.pingTimer = new PingTimer(
2079-
this.options.keepalive,
2080-
() => {
2081-
this._checkPing()
2082-
},
2072+
if (!this.keepaliveManager && this.options.keepalive) {
2073+
this.keepaliveManager = new KeepaliveManager(
2074+
this,
20832075
this.options.timerVariant,
20842076
)
2085-
this.pingResp = Date.now()
20862077
}
20872078
}
20882079

2089-
private _destroyPingTimer() {
2090-
if (this.pingTimer) {
2091-
this.log('_destroyPingTimer :: destroying ping timer')
2092-
this.pingTimer.destroy()
2093-
this.pingTimer = null
2080+
private _destroyKeepaliveManager() {
2081+
if (this.keepaliveManager) {
2082+
this.log('_destroyKeepaliveManager :: destroying keepalive manager')
2083+
this.keepaliveManager.destroy()
2084+
this.keepaliveManager = null
20942085
}
20952086
}
20962087

20972088
/**
2098-
2099-
* _shiftPingInterval - reschedule the ping interval
2100-
*
2101-
* @api private
2089+
* Reschedule the ping interval
21022090
*/
2103-
private _shiftPingInterval() {
2091+
public reschedulePing() {
21042092
if (
2105-
this.pingTimer &&
2093+
this.keepaliveManager &&
21062094
this.options.keepalive &&
21072095
this.options.reschedulePings
21082096
) {
@@ -2115,34 +2103,20 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
21152103
*/
21162104
private _reschedulePing() {
21172105
this.log('_reschedulePing :: rescheduling ping')
2118-
this.pingTimer.reschedule()
2106+
this.keepaliveManager.reschedule()
21192107
}
21202108

2121-
/**
2122-
* _checkPing - check if a pingresp has come back, and ping the server again
2123-
*
2124-
* @api private
2125-
*/
2126-
private _checkPing() {
2127-
this.log('_checkPing :: checking ping...')
2128-
// give 100ms offset to avoid ping timeout when receiving fast responses
2129-
const timeSincePing = Date.now() - this.pingResp - 100
2130-
if (timeSincePing <= this.options.keepalive * 1000) {
2131-
this.log('_checkPing :: ping response received in time')
2132-
this._sendPing()
2133-
} else {
2134-
// do a forced cleanup since socket will be in bad shape
2135-
this.emit('error', new Error('Keepalive timeout'))
2136-
this.log('_checkPing :: calling _cleanUp with force true')
2137-
this._cleanUp(true)
2138-
}
2139-
}
2140-
2141-
private _sendPing() {
2109+
public sendPing() {
21422110
this.log('_sendPing :: sending pingreq')
21432111
this._sendPacket({ cmd: 'pingreq' })
21442112
}
21452113

2114+
public onKeepaliveTimeout() {
2115+
this.emit('error', new Error('Keepalive timeout'))
2116+
this.log('onKeepaliveTimeout :: calling _cleanUp with force true')
2117+
this._cleanUp(true)
2118+
}
2119+
21462120
/**
21472121
* _resubscribe
21482122
* @api private
@@ -2205,7 +2179,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
22052179

22062180
this.connackPacket = packet
22072181
this.messageIdProvider.clear()
2208-
this._setupPingTimer()
2182+
this._setupKeepaliveManager()
22092183

22102184
this.connected = true
22112185

src/lib/get-timer.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
import isBrowser, { isWebWorker, isReactNativeBrowser } from './is-browser'
2-
import { clearTimeout as clearT, setTimeout as setT } from 'worker-timers'
2+
import { clearInterval as clearI, setInterval as setI } from 'worker-timers'
33
import type { TimerVariant } from './shared'
44

55
// dont directly assign globals to class props otherwise this throws in web workers: Uncaught TypeError: Illegal invocation
66
// See: https://stackoverflow.com/questions/9677985/uncaught-typeerror-illegal-invocation-in-chrome
77

88
export interface Timer {
9-
set: typeof setT
10-
clear: typeof clearT
9+
set: typeof setI
10+
clear: typeof clearI
1111
}
1212

1313
const workerTimer: Timer = {
14-
set: setT,
15-
clear: clearT,
14+
set: setI,
15+
clear: clearI,
1616
}
1717

1818
const nativeTimer: Timer = {
19-
set: (func, time) => setTimeout(func, time),
20-
clear: (timerId) => clearTimeout(timerId),
19+
set: (func, time) => setInterval(func, time),
20+
clear: (timerId) => clearInterval(timerId),
2121
}
2222

2323
const getTimer = (variant: TimerVariant): Timer => {

src/lib/handlers/connack.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ const handleConnack: PacketHandler = (client, packet: IConnackPacket) => {
2929
}
3030
if (packet.properties.serverKeepAlive && options.keepalive) {
3131
options.keepalive = packet.properties.serverKeepAlive
32-
client['_shiftPingInterval']()
3332
}
33+
3434
if (packet.properties.maximumPacketSize) {
3535
if (!options.properties) {
3636
options.properties = {}

0 commit comments

Comments
 (0)