Skip to content

Commit 6110d85

Browse files
authored
Adds exponential backoff to MQTT (#1018)
1 parent d065032 commit 6110d85

File tree

1 file changed

+87
-52
lines changed
  • iot/api-client/manager/src/main/java/com/example/cloud/iot/examples

1 file changed

+87
-52
lines changed

iot/api-client/manager/src/main/java/com/example/cloud/iot/examples/MqttExample.java

Lines changed: 87 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public static void main(String[] args) throws Exception {
153153
options.projectId, options.cloudRegion, options.registryId, options.deviceId);
154154

155155
MqttConnectOptions connectOptions = new MqttConnectOptions();
156-
// Note that the the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
156+
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
157157
// explictly set this. If you don't set MQTT version, the server will immediately close its
158158
// connection to your device.
159159
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
@@ -179,66 +179,101 @@ public static void main(String[] args) throws Exception {
179179
// [START cloudiotcore_mqtt_publish]
180180
// Create a client, and connect to the Google MQTT bridge.
181181
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());
182-
try {
183-
client.connect(connectOptions);
184-
attachCallback(client, options.deviceId);
185-
186-
// Publish to the events or state topic based on the flag.
187-
String subTopic = options.messageType.equals("event") ? "events" : options.messageType;
188-
189-
// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
190-
// required to be in the format below. Note that this is not the same as the device registry's
191-
// Cloud Pub/Sub topic.
192-
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);
193-
194-
// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
195-
for (int i = 1; i <= options.numMessages; ++i) {
196-
String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
197-
System.out.format(
198-
"Publishing %s message %d/%d: '%s'\n",
199-
options.messageType, i, options.numMessages, payload);
200-
201-
// Refresh the connection credentials before the JWT expires.
202-
// [START cloudiotcore_mqtt_jwt_refresh]
203-
long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
204-
if (secsSinceRefresh > (options.tokenExpMins * 60)) {
205-
System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh);
206-
iat = new DateTime();
207-
if (options.algorithm.equals("RS256")) {
208-
connectOptions.setPassword(
209-
createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
210-
} else if (options.algorithm.equals("ES256")) {
211-
connectOptions.setPassword(
212-
createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
213-
} else {
214-
throw new IllegalArgumentException(
215-
"Invalid algorithm " + options.algorithm
216-
+ ". Should be one of 'RS256' or 'ES256'.");
182+
183+
// Both connect and publish operations may fail. If they do, allow retries but with an
184+
// exponential backoff time period.
185+
long initialConnectIntervalMillis = 500L;
186+
long maxConnectIntervalMillis = 6000L;
187+
long maxConnectRetryTimeElapsedMillis = 900000L;
188+
float intervalMultiplier = 1.5f;
189+
190+
long retryIntervalMs = initialConnectIntervalMillis;
191+
long totalRetryTimeMs = 0;
192+
193+
while (!client.isConnected() && totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) {
194+
try {
195+
client.connect(connectOptions);
196+
} catch (MqttException e) {
197+
int reason = e.getReasonCode();
198+
199+
// If the connection is lost or if the server cannot be connected, allow retries, but with
200+
// exponential backoff.
201+
System.out.println("An error occurred: " + e.getMessage());
202+
if (reason == MqttException.REASON_CODE_CONNECTION_LOST
203+
|| reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
204+
System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
205+
Thread.sleep(retryIntervalMs);
206+
totalRetryTimeMs += retryIntervalMs;
207+
retryIntervalMs *= intervalMultiplier;
208+
if (retryIntervalMs > maxConnectIntervalMillis) {
209+
retryIntervalMs = maxConnectIntervalMillis;
217210
}
218-
client.disconnect();
219-
client.connect();
220-
attachCallback(client, options.deviceId);
211+
} else {
212+
throw e;
221213
}
222-
// [END cloudiotcore_mqtt_jwt_refresh]
214+
}
215+
}
216+
217+
attachCallback(client, options.deviceId);
223218

224-
// Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
225-
// also supports qos=0 for at most once delivery.
226-
MqttMessage message = new MqttMessage(payload.getBytes());
227-
message.setQos(1);
228-
client.publish(mqttTopic, message);
219+
// Publish to the events or state topic based on the flag.
220+
String subTopic = options.messageType.equals("event") ? "events" : options.messageType;
229221

230-
if (options.messageType.equals("event")) {
231-
// Send telemetry events every second
232-
Thread.sleep(1000);
222+
// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
223+
// required to be in the format below. Note that this is not the same as the device registry's
224+
// Cloud Pub/Sub topic.
225+
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);
226+
227+
// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
228+
for (int i = 1; i <= options.numMessages; ++i) {
229+
String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
230+
System.out.format(
231+
"Publishing %s message %d/%d: '%s'\n",
232+
options.messageType, i, options.numMessages, payload);
233+
234+
// Refresh the connection credentials before the JWT expires.
235+
// [START cloudiotcore_mqtt_jwt_refresh]
236+
long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
237+
if (secsSinceRefresh > (options.tokenExpMins * 60)) {
238+
System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh);
239+
iat = new DateTime();
240+
if (options.algorithm.equals("RS256")) {
241+
connectOptions.setPassword(
242+
createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
243+
} else if (options.algorithm.equals("ES256")) {
244+
connectOptions.setPassword(
245+
createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
233246
} else {
234-
// Note: Update Device state less frequently than with telemetry events
235-
Thread.sleep(5000);
247+
throw new IllegalArgumentException(
248+
"Invalid algorithm " + options.algorithm
249+
+ ". Should be one of 'RS256' or 'ES256'.");
236250
}
251+
client.disconnect();
252+
client.connect();
253+
attachCallback(client, options.deviceId);
254+
}
255+
// [END cloudiotcore_mqtt_jwt_refresh]
256+
257+
// Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
258+
// also supports qos=0 for at most once delivery.
259+
MqttMessage message = new MqttMessage(payload.getBytes());
260+
message.setQos(1);
261+
client.publish(mqttTopic, message);
262+
263+
if (options.messageType.equals("event")) {
264+
// Send telemetry events every second
265+
Thread.sleep(1000);
266+
} else {
267+
// Note: Update Device state less frequently than with telemetry events
268+
Thread.sleep(5000);
237269
}
238-
} finally {
239-
// Disconnect the client and finish the run.
270+
}
271+
272+
// Disconnect the client if still connected, and finish the run.
273+
if (client.isConnected()) {
240274
client.disconnect();
241275
}
276+
242277
System.out.println("Finished loop successfully. Goodbye!");
243278
// [END cloudiotcore_mqtt_publish]
244279
}

0 commit comments

Comments
 (0)