From 71d49e9a9900ef123b4032126387df9d3e589511 Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 20 Aug 2019 15:02:51 +0200 Subject: [PATCH] Decouple shard allocation awareness from search and get requests With this commit, Elasticsearch will no longer prefer using shards in the same location (with the same awareness attribute values) to process `_search` and `_get` requests. Instead, adaptive replica selection (the default since 7.0) should route requests more efficiently using the service time of prior inter-node communications. Clusters with big latencies between nodes should switch to cross cluster replication to isolate nodes within the same zone. Note that this change only targets 8.0 since it is considered as breaking. However a follow up pr should add an option to activate this behavior in 7.x in order to allow users to opt-in early. Closes #43453 --- .../migration/migrate_8_0/search.asciidoc | 12 ++- .../cluster/allocation_awareness.asciidoc | 4 - .../routing/IndexShardRoutingTable.java | 88 ------------------- .../cluster/routing/OperationRouting.java | 26 +----- .../structure/RoutingIteratorTests.java | 55 ------------ 5 files changed, 14 insertions(+), 171 deletions(-) diff --git a/docs/reference/migration/migrate_8_0/search.asciidoc b/docs/reference/migration/migrate_8_0/search.asciidoc index 97796a10fca22..4c3be2b4fadd0 100644 --- a/docs/reference/migration/migrate_8_0/search.asciidoc +++ b/docs/reference/migration/migrate_8_0/search.asciidoc @@ -12,16 +12,24 @@ The `/{index}/{type}/_termvectors`, `/{index}/{type}/{id}/_termvectors` and `/{i [float] ==== Removal of queries -The `common` query was deprecated in 7.x and has been removed in 8.0. +The `common` query, deprecated in 7.x, has been removed in 8.0. The same functionality can be achieved by the `match` query if the total number of hits is not tracked. [float] ===== Removal of query parameters -The `cutoff_frequency` parameter was deprecated in 7.x and has been removed in 8.0 from `match` and `multi_match` queries. +The `cutoff_frequency` parameter, deprecated in 7.x, has been removed in 8.0 from `match` and `multi_match` queries. The same functionality can be achieved without any configuration provided that the total number of hits is not tracked. [float] ===== Removal of sort parameters The `nested_filter` and `nested_path` options, deprecated in 6.x, have been removed in favor of the `nested` context. + + +[float] +===== Shard allocation awareness in Search and Get requests + +{es} will no longer prefer using shards in the same location (with the same awareness attribute values) to process +`_search` and `_get` requests. Adaptive replica selection (activated by default in this version) will route requests +more efficiently using the service time of prior inter-node communications. \ No newline at end of file diff --git a/docs/reference/modules/cluster/allocation_awareness.asciidoc b/docs/reference/modules/cluster/allocation_awareness.asciidoc index 5fc9197d449d5..2d81be8a87ecd 100644 --- a/docs/reference/modules/cluster/allocation_awareness.asciidoc +++ b/docs/reference/modules/cluster/allocation_awareness.asciidoc @@ -17,10 +17,6 @@ The allocation awareness settings can be configured in `elasticsearch.yml` and updated dynamically with the <> API. -{es} prefers using shards in the same location (with the same -awareness attribute values) to process search or GET requests. Using local -shards is usually faster than crossing rack or zone boundaries. - NOTE: The number of attribute values determines how many shard copies are allocated in each location. If the number of nodes in each location is unbalanced and there are a lot of replicas, replica shards might be left diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 5a98e9456f43d..9fe807c394282 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -19,13 +19,11 @@ package org.elasticsearch.cluster.routing; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -44,8 +42,6 @@ import java.util.Optional; import java.util.Set; -import static java.util.Collections.emptyMap; - /** * {@link IndexShardRoutingTable} encapsulates all instances of a single shard. * Each Elasticsearch index consists of multiple shards, each shard encapsulates @@ -67,10 +63,6 @@ public class IndexShardRoutingTable implements Iterable { final Set allAllocationIds; final boolean allShardsStarted; - private volatile Map activeShardsByAttributes = emptyMap(); - private volatile Map initializingShardsByAttributes = emptyMap(); - private final Object shardsByAttributeMutex = new Object(); - /** * The initializing list, including ones that are initializing on a target node because of relocation. * If we can come up with a better variable name, it would be nice... @@ -549,86 +541,6 @@ public boolean equals(Object obj) { } } - static class AttributesRoutings { - - public final List withSameAttribute; - public final List withoutSameAttribute; - public final int totalSize; - - AttributesRoutings(List withSameAttribute, List withoutSameAttribute) { - this.withSameAttribute = withSameAttribute; - this.withoutSameAttribute = withoutSameAttribute; - this.totalSize = withoutSameAttribute.size() + withSameAttribute.size(); - } - } - - private AttributesRoutings getActiveAttribute(AttributesKey key, DiscoveryNodes nodes) { - AttributesRoutings shardRoutings = activeShardsByAttributes.get(key); - if (shardRoutings == null) { - synchronized (shardsByAttributeMutex) { - ArrayList from = new ArrayList<>(activeShards); - List to = collectAttributeShards(key, nodes, from); - - shardRoutings = new AttributesRoutings(to, Collections.unmodifiableList(from)); - activeShardsByAttributes = Maps.copyMapWithAddedEntry(activeShardsByAttributes, key, shardRoutings); - } - } - return shardRoutings; - } - - private AttributesRoutings getInitializingAttribute(AttributesKey key, DiscoveryNodes nodes) { - AttributesRoutings shardRoutings = initializingShardsByAttributes.get(key); - if (shardRoutings == null) { - synchronized (shardsByAttributeMutex) { - ArrayList from = new ArrayList<>(allInitializingShards); - List to = collectAttributeShards(key, nodes, from); - shardRoutings = new AttributesRoutings(to, Collections.unmodifiableList(from)); - initializingShardsByAttributes = - Maps.copyMapWithAddedEntry(initializingShardsByAttributes, key, shardRoutings); - } - } - return shardRoutings; - } - - private static List collectAttributeShards(AttributesKey key, DiscoveryNodes nodes, ArrayList from) { - final ArrayList to = new ArrayList<>(); - for (final String attribute : key.attributes) { - final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute); - if (localAttributeValue != null) { - for (Iterator iterator = from.iterator(); iterator.hasNext(); ) { - ShardRouting fromShard = iterator.next(); - final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId()); - if (discoveryNode == null) { - iterator.remove(); // node is not present anymore - ignore shard - } else if (localAttributeValue.equals(discoveryNode.getAttributes().get(attribute))) { - iterator.remove(); - to.add(fromShard); - } - } - } - } - return Collections.unmodifiableList(to); - } - - public ShardIterator preferAttributesActiveInitializingShardsIt(List attributes, DiscoveryNodes nodes) { - return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed()); - } - - public ShardIterator preferAttributesActiveInitializingShardsIt(List attributes, DiscoveryNodes nodes, int seed) { - AttributesKey key = new AttributesKey(attributes); - AttributesRoutings activeRoutings = getActiveAttribute(key, nodes); - AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes); - - // we now randomize, once between the ones that have the same attributes, and once for the ones that don't - // we don't want to mix between the two! - ArrayList ordered = new ArrayList<>(activeRoutings.totalSize + initializingRoutings.totalSize); - ordered.addAll(shuffler.shuffle(activeRoutings.withSameAttribute, seed)); - ordered.addAll(shuffler.shuffle(activeRoutings.withoutSameAttribute, seed)); - ordered.addAll(shuffler.shuffle(initializingRoutings.withSameAttribute, seed)); - ordered.addAll(shuffler.shuffle(initializingRoutings.withoutSameAttribute, seed)); - return new PlainShardIterator(shardId, ordered); - } - public ShardRouting primaryShard() { return primary; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 9d0a081af4cbf..6d9397db3b377 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -36,7 +35,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -47,14 +45,10 @@ public class OperationRouting { Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", true, Setting.Property.Dynamic, Setting.Property.NodeScope); - private List awarenessAttributes; private boolean useAdaptiveReplicaSelection; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { - this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, - this::setAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); } @@ -62,10 +56,6 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; } - private void setAwarenessAttributes(List awarenessAttributes) { - this.awarenessAttributes = awarenessAttributes; - } - public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) { return shards(clusterState, index, id, routing).shardsIt(); } @@ -194,23 +184,15 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index } // if not, then use it as the index int routingHash = 31 * Murmur3HashFunction.hash(preference) + indexShard.shardId.hashCode(); - if (awarenessAttributes.isEmpty()) { - return indexShard.activeInitializingShardsIt(routingHash); - } else { - return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash); - } + return indexShard.activeInitializingShardsIt(routingHash); } private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes, @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts) { - if (awarenessAttributes.isEmpty()) { - if (useAdaptiveReplicaSelection) { - return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); - } else { - return indexShard.activeInitializingShardsRandomIt(); - } + if (useAdaptiveReplicaSelection) { + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); } else { - return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); + return indexShard.activeInitializingShardsRandomIt(); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index 993a8ce408b4a..51f1b52e9f3ee 100644 --- a/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -41,10 +41,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; -import java.util.Map; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.anyOf; @@ -224,59 +222,6 @@ public void testRandomRouting() { assertThat(shardRouting1, sameInstance(shardRouting3)); } - public void testAttributePreferenceRouting() { - Settings.Builder settings = Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always"); - if (randomBoolean()) { - settings.put("cluster.routing.allocation.awareness.attributes", " rack_id, zone "); - } else { - settings.putList("cluster.routing.allocation.awareness.attributes", "rack_id", "zone"); - } - - AllocationService strategy = createAllocationService(settings.build()); - - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) - .build(); - - RoutingTable routingTable = RoutingTable.builder() - .addAsNew(metaData.index("test")) - .build(); - - ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING - .getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); - - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", Map.of("rack_id", "rack_1", "zone", "zone1"))) - .add(newNode("node2", Map.of("rack_id", "rack_2", "zone", "zone2"))) - .localNodeId("node1") - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); - - clusterState = startInitializingShardsAndReroute(strategy, clusterState); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); - - // after all are started, check routing iteration - ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0) - .preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes()); - ShardRouting shardRouting = shardIterator.nextOrNull(); - assertThat(shardRouting, notNullValue()); - assertThat(shardRouting.currentNodeId(), equalTo("node1")); - shardRouting = shardIterator.nextOrNull(); - assertThat(shardRouting, notNullValue()); - assertThat(shardRouting.currentNodeId(), equalTo("node2")); - - shardIterator = clusterState.routingTable().index("test").shard(0) - .preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes()); - shardRouting = shardIterator.nextOrNull(); - assertThat(shardRouting, notNullValue()); - assertThat(shardRouting.currentNodeId(), equalTo("node1")); - shardRouting = shardIterator.nextOrNull(); - assertThat(shardRouting, notNullValue()); - assertThat(shardRouting.currentNodeId(), equalTo("node2")); - } - public void testNodeSelectorRouting(){ AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10)