From 9108e27388aa560baf2a51322359c46c01715798 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Thu, 8 Feb 2018 12:12:56 -0800 Subject: [PATCH] Adds exponential backoff to MQTT --- .../cloud/iot/examples/MqttExample.java | 139 +++++++++++------- 1 file changed, 87 insertions(+), 52 deletions(-) diff --git a/iot/api-client/manager/src/main/java/com/example/cloud/iot/examples/MqttExample.java b/iot/api-client/manager/src/main/java/com/example/cloud/iot/examples/MqttExample.java index 24c741837ef..d5405c019eb 100644 --- a/iot/api-client/manager/src/main/java/com/example/cloud/iot/examples/MqttExample.java +++ b/iot/api-client/manager/src/main/java/com/example/cloud/iot/examples/MqttExample.java @@ -153,7 +153,7 @@ public static void main(String[] args) throws Exception { options.projectId, options.cloudRegion, options.registryId, options.deviceId); MqttConnectOptions connectOptions = new MqttConnectOptions(); - // Note that the the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we + // Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we // explictly set this. If you don't set MQTT version, the server will immediately close its // connection to your device. connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); @@ -179,66 +179,101 @@ public static void main(String[] args) throws Exception { // [START cloudiotcore_mqtt_publish] // Create a client, and connect to the Google MQTT bridge. MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence()); - try { - client.connect(connectOptions); - attachCallback(client, options.deviceId); - - // Publish to the events or state topic based on the flag. - String subTopic = options.messageType.equals("event") ? "events" : options.messageType; - - // The MQTT topic that this device will publish telemetry data to. The MQTT topic name is - // required to be in the format below. Note that this is not the same as the device registry's - // Cloud Pub/Sub topic. - String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic); - - // Publish numMessages messages to the MQTT bridge, at a rate of 1 per second. - for (int i = 1; i <= options.numMessages; ++i) { - String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i); - System.out.format( - "Publishing %s message %d/%d: '%s'\n", - options.messageType, i, options.numMessages, payload); - - // Refresh the connection credentials before the JWT expires. - // [START cloudiotcore_mqtt_jwt_refresh] - long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000; - if (secsSinceRefresh > (options.tokenExpMins * 60)) { - System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh); - iat = new DateTime(); - if (options.algorithm.equals("RS256")) { - connectOptions.setPassword( - createJwtRsa(options.projectId, options.privateKeyFile).toCharArray()); - } else if (options.algorithm.equals("ES256")) { - connectOptions.setPassword( - createJwtEs(options.projectId, options.privateKeyFile).toCharArray()); - } else { - throw new IllegalArgumentException( - "Invalid algorithm " + options.algorithm - + ". Should be one of 'RS256' or 'ES256'."); + + // Both connect and publish operations may fail. If they do, allow retries but with an + // exponential backoff time period. + long initialConnectIntervalMillis = 500L; + long maxConnectIntervalMillis = 6000L; + long maxConnectRetryTimeElapsedMillis = 900000L; + float intervalMultiplier = 1.5f; + + long retryIntervalMs = initialConnectIntervalMillis; + long totalRetryTimeMs = 0; + + while (!client.isConnected() && totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) { + try { + client.connect(connectOptions); + } catch (MqttException e) { + int reason = e.getReasonCode(); + + // If the connection is lost or if the server cannot be connected, allow retries, but with + // exponential backoff. + System.out.println("An error occurred: " + e.getMessage()); + if (reason == MqttException.REASON_CODE_CONNECTION_LOST + || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) { + System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds."); + Thread.sleep(retryIntervalMs); + totalRetryTimeMs += retryIntervalMs; + retryIntervalMs *= intervalMultiplier; + if (retryIntervalMs > maxConnectIntervalMillis) { + retryIntervalMs = maxConnectIntervalMillis; } - client.disconnect(); - client.connect(); - attachCallback(client, options.deviceId); + } else { + throw e; } - // [END cloudiotcore_mqtt_jwt_refresh] + } + } + + attachCallback(client, options.deviceId); - // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core - // also supports qos=0 for at most once delivery. - MqttMessage message = new MqttMessage(payload.getBytes()); - message.setQos(1); - client.publish(mqttTopic, message); + // Publish to the events or state topic based on the flag. + String subTopic = options.messageType.equals("event") ? "events" : options.messageType; - if (options.messageType.equals("event")) { - // Send telemetry events every second - Thread.sleep(1000); + // The MQTT topic that this device will publish telemetry data to. The MQTT topic name is + // required to be in the format below. Note that this is not the same as the device registry's + // Cloud Pub/Sub topic. + String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic); + + // Publish numMessages messages to the MQTT bridge, at a rate of 1 per second. + for (int i = 1; i <= options.numMessages; ++i) { + String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i); + System.out.format( + "Publishing %s message %d/%d: '%s'\n", + options.messageType, i, options.numMessages, payload); + + // Refresh the connection credentials before the JWT expires. + // [START cloudiotcore_mqtt_jwt_refresh] + long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000; + if (secsSinceRefresh > (options.tokenExpMins * 60)) { + System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh); + iat = new DateTime(); + if (options.algorithm.equals("RS256")) { + connectOptions.setPassword( + createJwtRsa(options.projectId, options.privateKeyFile).toCharArray()); + } else if (options.algorithm.equals("ES256")) { + connectOptions.setPassword( + createJwtEs(options.projectId, options.privateKeyFile).toCharArray()); } else { - // Note: Update Device state less frequently than with telemetry events - Thread.sleep(5000); + throw new IllegalArgumentException( + "Invalid algorithm " + options.algorithm + + ". Should be one of 'RS256' or 'ES256'."); } + client.disconnect(); + client.connect(); + attachCallback(client, options.deviceId); + } + // [END cloudiotcore_mqtt_jwt_refresh] + + // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core + // also supports qos=0 for at most once delivery. + MqttMessage message = new MqttMessage(payload.getBytes()); + message.setQos(1); + client.publish(mqttTopic, message); + + if (options.messageType.equals("event")) { + // Send telemetry events every second + Thread.sleep(1000); + } else { + // Note: Update Device state less frequently than with telemetry events + Thread.sleep(5000); } - } finally { - // Disconnect the client and finish the run. + } + + // Disconnect the client if still connected, and finish the run. + if (client.isConnected()) { client.disconnect(); } + System.out.println("Finished loop successfully. Goodbye!"); // [END cloudiotcore_mqtt_publish] }