Skip to content

Adds exponential backoff to MQTT #1018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static void main(String[] args) throws Exception {
options.projectId, options.cloudRegion, options.registryId, options.deviceId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
Expand All @@ -179,66 +179,101 @@ public static void main(String[] args) throws Exception {
// [START cloudiotcore_mqtt_publish]
// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());
try {
client.connect(connectOptions);
attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = options.messageType.equals("event") ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
System.out.format(
"Publishing %s message %d/%d: '%s'\n",
options.messageType, i, options.numMessages, payload);

// Refresh the connection credentials before the JWT expires.
// [START cloudiotcore_mqtt_jwt_refresh]
long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
if (secsSinceRefresh > (options.tokenExpMins * 60)) {
System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh);
iat = new DateTime();
if (options.algorithm.equals("RS256")) {
connectOptions.setPassword(
createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
} else if (options.algorithm.equals("ES256")) {
connectOptions.setPassword(
createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
} else {
throw new IllegalArgumentException(
"Invalid algorithm " + options.algorithm
+ ". Should be one of 'RS256' or 'ES256'.");

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while (!client.isConnected() && totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) {
try {
client.connect(connectOptions);
} catch (MqttException e) {
int reason = e.getReasonCode();

// If the connection is lost or if the server cannot be connected, allow retries, but with
// exponential backoff.
System.out.println("An error occurred: " + e.getMessage());
if (reason == MqttException.REASON_CODE_CONNECTION_LOST
|| reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
Thread.sleep(retryIntervalMs);
totalRetryTimeMs += retryIntervalMs;
retryIntervalMs *= intervalMultiplier;
if (retryIntervalMs > maxConnectIntervalMillis) {
retryIntervalMs = maxConnectIntervalMillis;
}
client.disconnect();
client.connect();
attachCallback(client, options.deviceId);
} else {
throw e;
}
// [END cloudiotcore_mqtt_jwt_refresh]
}
}

attachCallback(client, options.deviceId);

// Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
// also supports qos=0 for at most once delivery.
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
client.publish(mqttTopic, message);
// Publish to the events or state topic based on the flag.
String subTopic = options.messageType.equals("event") ? "events" : options.messageType;

if (options.messageType.equals("event")) {
// Send telemetry events every second
Thread.sleep(1000);
// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
System.out.format(
"Publishing %s message %d/%d: '%s'\n",
options.messageType, i, options.numMessages, payload);

// Refresh the connection credentials before the JWT expires.
// [START cloudiotcore_mqtt_jwt_refresh]
long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
if (secsSinceRefresh > (options.tokenExpMins * 60)) {
System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh);
iat = new DateTime();
if (options.algorithm.equals("RS256")) {
connectOptions.setPassword(
createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
} else if (options.algorithm.equals("ES256")) {
connectOptions.setPassword(
createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
} else {
// Note: Update Device state less frequently than with telemetry events
Thread.sleep(5000);
throw new IllegalArgumentException(
"Invalid algorithm " + options.algorithm
+ ". Should be one of 'RS256' or 'ES256'.");
}
client.disconnect();
client.connect();
attachCallback(client, options.deviceId);
}
// [END cloudiotcore_mqtt_jwt_refresh]

// Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
// also supports qos=0 for at most once delivery.
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
client.publish(mqttTopic, message);

if (options.messageType.equals("event")) {
// Send telemetry events every second
Thread.sleep(1000);
} else {
// Note: Update Device state less frequently than with telemetry events
Thread.sleep(5000);
}
} finally {
// Disconnect the client and finish the run.
}

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
// [END cloudiotcore_mqtt_publish]
}
Expand Down