From 4fbf36ba86f2c54c2be09aa839748ab93559db18 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Oct 2019 16:26:42 +0200 Subject: [PATCH 1/2] Do not auto-follow closed indices (#47721) Backport of (#47721) for 7.x. Similarly to #47582, Auto-follow patterns creates following indices as long as the remote index matches the pattern and the remote primary shards are all started. But since 7.2 closed indices are also replicated, and it does not play well with CCR auto-follow patterns as they create following indices for closed leader indices too. This commit changes the getLeaderIndicesToFollow() so that closed indices are excluded from auto-follow patterns. --- .../ccr/action/AutoFollowCoordinator.java | 3 + .../action/AutoFollowCoordinatorTests.java | 120 +++++++++++++++++- 2 files changed, 119 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 13dc84b858243..cca60579ae6f9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -596,6 +596,9 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) { + if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) { + continue; + } if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 75606699b1159..a80b2ae71349a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.ccr.action; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -18,11 +20,13 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -44,10 +48,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -57,6 +63,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -416,6 +423,26 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { assertThat(result.get(1).getName(), equalTo("index2")); } + public void testGetLeaderIndicesToFollowWithClosedIndices() { + final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), + null, null, null, null, null, null, null, null, null, null, null); + + // index is opened + ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(1)); + assertThat(result, hasItem(remoteState.metaData().index("test-index").getIndex())); + + // index is closed + remoteState = ClusterState.builder(remoteState) + .metaData(MetaData.builder(remoteState.metaData()) + .put(IndexMetaData.builder(remoteState.metaData().index("test-index")).state(IndexMetaData.State.CLOSE).build(), true) + .build()) + .build(); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(0)); + } + public void testRecordLeaderIndexAsFollowFunction() { AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap()); @@ -763,7 +790,9 @@ void updateAutoFollowMetadata(Function updateFunctio autoFollower.start(); assertThat(allResults.size(), equalTo(states.length)); for (int i = 0; i < states.length; i++) { - assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true)); + final String indexName = "logs-" + i; + assertThat(allResults.get(i).autoFollowExecutionResults.keySet().stream() + .anyMatch(index -> index.getName().equals(indexName)), is(true)); } } @@ -1049,6 +1078,87 @@ void updateAutoFollowMetadata( } } + public void testClosedIndicesAreNotAutoFollowed() { + final Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + final String pattern = "pattern1"; + final ClusterState localState = ClusterState.builder(new ClusterName("local")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.singletonMap(pattern, + new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null, + null, null, null)), + Collections.singletonMap(pattern, Collections.emptyList()), + Collections.singletonMap(pattern, Collections.emptyMap())))) + .build(); + + ClusterState remoteState = null; + final int nbLeaderIndices = randomInt(15); + for (int i = 0; i < nbLeaderIndices; i++) { + String indexName = "docs-" + i; + if (remoteState == null) { + remoteState = createRemoteClusterState(indexName, true); + } else { + remoteState = createRemoteClusterState(remoteState, indexName); + } + if (randomBoolean()) { + // randomly close the index + remoteState = ClusterState.builder(remoteState.getClusterName()) + .routingTable(remoteState.routingTable()) + .metaData(MetaData.builder(remoteState.metaData()) + .put(IndexMetaData.builder(remoteState.metaData().index(indexName)).state(IndexMetaData.State.CLOSE).build(), true) + .build()) + .build(); + } + } + + final ClusterState finalRemoteState = remoteState; + final AtomicReference lastModifiedClusterState = new AtomicReference<>(localState); + final List results = new ArrayList<>(); + final Set followedIndices = ConcurrentCollections.newConcurrentSet(); + final AutoFollower autoFollower = + new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + followedIndices.add(followRequest.getLeaderIndex()); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + lastModifiedClusterState.updateAndGet(updateFunction::apply); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + + assertThat(results, notNullValue()); + assertThat(results.size(), equalTo(1)); + + for (ObjectObjectCursor index : remoteState.metaData().indices()) { + boolean expect = index.value.getState() == IndexMetaData.State.OPEN; + assertThat(results.get(0).autoFollowExecutionResults.containsKey(index.value.getIndex()), is(expect)); + assertThat(followedIndices.contains(index.key), is(expect)); + } + } + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { Settings.Builder indexSettings; if (enableSoftDeletes != null) { @@ -1075,11 +1185,13 @@ private static ClusterState createRemoteClusterState(String indexName, Boolean e private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { IndexMetaData indexMetaData = IndexMetaData.builder(indexName) - .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) .numberOfShards(1) .numberOfReplicas(0) .build(); - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName()) .metaData(MetaData.builder(previous.metaData()) .version(previous.metaData().version() + 1) .put(indexMetaData, true)); @@ -1087,7 +1199,7 @@ private static ClusterState createRemoteClusterState(ClusterState previous, Stri ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); - csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build(); return csBuilder.build(); } From 43962ecdc2ebaffd79615451b28d51ab13bbda5d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Oct 2019 17:17:30 +0200 Subject: [PATCH 2/2] Fix AutoFollowCoordinatorTests --- .../xpack/ccr/action/AutoFollowCoordinatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index a80b2ae71349a..e3edf0489d745 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -1094,7 +1094,7 @@ public void testClosedIndicesAreNotAutoFollowed() { .build(); ClusterState remoteState = null; - final int nbLeaderIndices = randomInt(15); + final int nbLeaderIndices = randomIntBetween(1, 15); for (int i = 0; i < nbLeaderIndices; i++) { String indexName = "docs-" + i; if (remoteState == null) {