From 775c10123af2abaf1fe44a8923b952d5c2f54cfd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 22 Oct 2018 16:09:15 -0400 Subject: [PATCH 1/3] CCR: Add TransportService closed to retryable errors Both testFollowIndexAndCloseNode and testFailOverOnFollower failed because they responded to the FollowTask a TransportService closed exception which is currently considered as a fatal error. This behavior is not desirable since a closing node can throw that exception, and we should retry in this case. This change adds TransportService closed error to the list of retryable errors. --- .../xpack/ccr/action/ShardFollowNodeTask.java | 11 +++++++++++ .../org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 2 -- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 55d246fea4b31..863eb10a75a6d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -28,6 +28,8 @@ import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -369,6 +371,7 @@ private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable tas scheduler.accept(TimeValue.timeValueMillis(delay), task); } else { fatalException = ExceptionsHelper.convertToElastic(e); + LOGGER.warn("shard follow task encounter non-retryable error", e); } } @@ -387,6 +390,14 @@ static boolean shouldRetry(Exception e) { return true; } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { return true; + } else { + final TransportException transportError = (TransportException) ExceptionsHelper.unwrap(e.getCause(), TransportException.class); + if (transportError != null) { + if (transportError instanceof NodeNotConnectedException || + (transportError.getMessage() != null && transportError.getMessage().contains("TransportService is closed"))) { + return true; + } + } } final Throwable actual = ExceptionsHelper.unwrapCause(e); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index ca6d2747c6cdb..48948abfa31d6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -271,7 +271,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34696") public void testFollowIndexAndCloseNode() throws Exception { getFollowerCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); @@ -619,7 +618,6 @@ public void testUnfollowIndex() throws Exception { assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34696") public void testFailOverOnFollower() throws Exception { int numberOfReplicas = between(1, 2); getFollowerCluster().startMasterOnlyNode(); From 194f74ac020ca18356cb3f5fbe1e67231c953a27 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 23 Oct 2018 08:34:50 -0400 Subject: [PATCH 2/3] reuse unwrap result --- .../xpack/ccr/action/ShardFollowNodeTask.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 863eb10a75a6d..823b8743a9818 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -28,8 +28,8 @@ import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -390,14 +390,6 @@ static boolean shouldRetry(Exception e) { return true; } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { return true; - } else { - final TransportException transportError = (TransportException) ExceptionsHelper.unwrap(e.getCause(), TransportException.class); - if (transportError != null) { - if (transportError instanceof NodeNotConnectedException || - (transportError.getMessage() != null && transportError.getMessage().contains("TransportService is closed"))) { - return true; - } - } } final Throwable actual = ExceptionsHelper.unwrapCause(e); @@ -408,7 +400,10 @@ static boolean shouldRetry(Exception e) { actual instanceof AlreadyClosedException || actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges actual instanceof ClusterBlockException || // If leader index is closed or no elected master - actual instanceof IndexClosedException; // If follow index is closed + actual instanceof IndexClosedException || // If follow index is closed + + actual instanceof NodeDisconnectedException || actual instanceof NodeNotConnectedException || + (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); } // These methods are protected for testing purposes: From 1f1350909d86d76d90bbf973fbe7ad5ff9e8881f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 23 Oct 2018 10:54:42 -0400 Subject: [PATCH 3/3] newline --- .../elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 823b8743a9818..6a92189f5626a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -401,8 +401,8 @@ static boolean shouldRetry(Exception e) { actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges actual instanceof ClusterBlockException || // If leader index is closed or no elected master actual instanceof IndexClosedException || // If follow index is closed - - actual instanceof NodeDisconnectedException || actual instanceof NodeNotConnectedException || + actual instanceof NodeDisconnectedException || + actual instanceof NodeNotConnectedException || (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); }