Skip to content

Commit a4ed7b1

Browse files
authored
Decouple shard allocation awareness from search and get requests (#45735)
With this commit, Elasticsearch will no longer prefer using shards in the same location (with the same awareness attribute values) to process `_search` and `_get` requests. Instead, adaptive replica selection (the default since 7.0) should route requests more efficiently using the service time of prior inter-node communications. Clusters with big latencies between nodes should switch to cross cluster replication to isolate nodes within the same zone. Note that this change only targets 8.0 since it is considered as breaking. However a follow up pr should add an option to activate this behavior in 7.x in order to allow users to opt-in early. Closes #43453
1 parent c4fa32d commit a4ed7b1

File tree

5 files changed

+14
-171
lines changed

5 files changed

+14
-171
lines changed

docs/reference/migration/migrate_8_0/search.asciidoc

+10-2
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,24 @@ The `/{index}/{type}/_termvectors`, `/{index}/{type}/{id}/_termvectors` and `/{i
1212
[float]
1313
==== Removal of queries
1414

15-
The `common` query was deprecated in 7.x and has been removed in 8.0.
15+
The `common` query, deprecated in 7.x, has been removed in 8.0.
1616
The same functionality can be achieved by the `match` query if the total number of hits is not tracked.
1717

1818
[float]
1919
===== Removal of query parameters
2020

21-
The `cutoff_frequency` parameter was deprecated in 7.x and has been removed in 8.0 from `match` and `multi_match` queries.
21+
The `cutoff_frequency` parameter, deprecated in 7.x, has been removed in 8.0 from `match` and `multi_match` queries.
2222
The same functionality can be achieved without any configuration provided that the total number of hits is not tracked.
2323

2424
[float]
2525
===== Removal of sort parameters
2626

2727
The `nested_filter` and `nested_path` options, deprecated in 6.x, have been removed in favor of the `nested` context.
28+
29+
30+
[float]
31+
===== Shard allocation awareness in Search and Get requests
32+
33+
{es} will no longer prefer using shards in the same location (with the same awareness attribute values) to process
34+
`_search` and `_get` requests. Adaptive replica selection (activated by default in this version) will route requests
35+
more efficiently using the service time of prior inter-node communications.

docs/reference/modules/cluster/allocation_awareness.asciidoc

-4
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ The allocation awareness settings can be configured in
1717
`elasticsearch.yml` and updated dynamically with the
1818
<<cluster-update-settings,cluster-update-settings>> API.
1919

20-
{es} prefers using shards in the same location (with the same
21-
awareness attribute values) to process search or GET requests. Using local
22-
shards is usually faster than crossing rack or zone boundaries.
23-
2420
NOTE: The number of attribute values determines how many shard copies are
2521
allocated in each location. If the number of nodes in each location is
2622
unbalanced and there are a lot of replicas, replica shards might be left

server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java

-88
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@
1919

2020
package org.elasticsearch.cluster.routing;
2121

22-
import org.elasticsearch.cluster.node.DiscoveryNode;
2322
import org.elasticsearch.cluster.node.DiscoveryNodes;
2423
import org.elasticsearch.common.Nullable;
2524
import org.elasticsearch.common.Randomness;
2625
import org.elasticsearch.common.io.stream.StreamInput;
2726
import org.elasticsearch.common.io.stream.StreamOutput;
28-
import org.elasticsearch.common.util.Maps;
2927
import org.elasticsearch.common.util.set.Sets;
3028
import org.elasticsearch.index.Index;
3129
import org.elasticsearch.index.shard.ShardId;
@@ -44,8 +42,6 @@
4442
import java.util.Optional;
4543
import java.util.Set;
4644

47-
import static java.util.Collections.emptyMap;
48-
4945
/**
5046
* {@link IndexShardRoutingTable} encapsulates all instances of a single shard.
5147
* Each Elasticsearch index consists of multiple shards, each shard encapsulates
@@ -67,10 +63,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
6763
final Set<String> allAllocationIds;
6864
final boolean allShardsStarted;
6965

70-
private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = emptyMap();
71-
private volatile Map<AttributesKey, AttributesRoutings> initializingShardsByAttributes = emptyMap();
72-
private final Object shardsByAttributeMutex = new Object();
73-
7466
/**
7567
* The initializing list, including ones that are initializing on a target node because of relocation.
7668
* If we can come up with a better variable name, it would be nice...
@@ -549,86 +541,6 @@ public boolean equals(Object obj) {
549541
}
550542
}
551543

552-
static class AttributesRoutings {
553-
554-
public final List<ShardRouting> withSameAttribute;
555-
public final List<ShardRouting> withoutSameAttribute;
556-
public final int totalSize;
557-
558-
AttributesRoutings(List<ShardRouting> withSameAttribute, List<ShardRouting> withoutSameAttribute) {
559-
this.withSameAttribute = withSameAttribute;
560-
this.withoutSameAttribute = withoutSameAttribute;
561-
this.totalSize = withoutSameAttribute.size() + withSameAttribute.size();
562-
}
563-
}
564-
565-
private AttributesRoutings getActiveAttribute(AttributesKey key, DiscoveryNodes nodes) {
566-
AttributesRoutings shardRoutings = activeShardsByAttributes.get(key);
567-
if (shardRoutings == null) {
568-
synchronized (shardsByAttributeMutex) {
569-
ArrayList<ShardRouting> from = new ArrayList<>(activeShards);
570-
List<ShardRouting> to = collectAttributeShards(key, nodes, from);
571-
572-
shardRoutings = new AttributesRoutings(to, Collections.unmodifiableList(from));
573-
activeShardsByAttributes = Maps.copyMapWithAddedEntry(activeShardsByAttributes, key, shardRoutings);
574-
}
575-
}
576-
return shardRoutings;
577-
}
578-
579-
private AttributesRoutings getInitializingAttribute(AttributesKey key, DiscoveryNodes nodes) {
580-
AttributesRoutings shardRoutings = initializingShardsByAttributes.get(key);
581-
if (shardRoutings == null) {
582-
synchronized (shardsByAttributeMutex) {
583-
ArrayList<ShardRouting> from = new ArrayList<>(allInitializingShards);
584-
List<ShardRouting> to = collectAttributeShards(key, nodes, from);
585-
shardRoutings = new AttributesRoutings(to, Collections.unmodifiableList(from));
586-
initializingShardsByAttributes =
587-
Maps.copyMapWithAddedEntry(initializingShardsByAttributes, key, shardRoutings);
588-
}
589-
}
590-
return shardRoutings;
591-
}
592-
593-
private static List<ShardRouting> collectAttributeShards(AttributesKey key, DiscoveryNodes nodes, ArrayList<ShardRouting> from) {
594-
final ArrayList<ShardRouting> to = new ArrayList<>();
595-
for (final String attribute : key.attributes) {
596-
final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute);
597-
if (localAttributeValue != null) {
598-
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
599-
ShardRouting fromShard = iterator.next();
600-
final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
601-
if (discoveryNode == null) {
602-
iterator.remove(); // node is not present anymore - ignore shard
603-
} else if (localAttributeValue.equals(discoveryNode.getAttributes().get(attribute))) {
604-
iterator.remove();
605-
to.add(fromShard);
606-
}
607-
}
608-
}
609-
}
610-
return Collections.unmodifiableList(to);
611-
}
612-
613-
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes) {
614-
return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed());
615-
}
616-
617-
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes, int seed) {
618-
AttributesKey key = new AttributesKey(attributes);
619-
AttributesRoutings activeRoutings = getActiveAttribute(key, nodes);
620-
AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes);
621-
622-
// we now randomize, once between the ones that have the same attributes, and once for the ones that don't
623-
// we don't want to mix between the two!
624-
ArrayList<ShardRouting> ordered = new ArrayList<>(activeRoutings.totalSize + initializingRoutings.totalSize);
625-
ordered.addAll(shuffler.shuffle(activeRoutings.withSameAttribute, seed));
626-
ordered.addAll(shuffler.shuffle(activeRoutings.withoutSameAttribute, seed));
627-
ordered.addAll(shuffler.shuffle(initializingRoutings.withSameAttribute, seed));
628-
ordered.addAll(shuffler.shuffle(initializingRoutings.withoutSameAttribute, seed));
629-
return new PlainShardIterator(shardId, ordered);
630-
}
631-
632544
public ShardRouting primaryShard() {
633545
return primary;
634546
}

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

+4-22
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.cluster.ClusterState;
2323
import org.elasticsearch.cluster.metadata.IndexMetaData;
2424
import org.elasticsearch.cluster.node.DiscoveryNodes;
25-
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
2625
import org.elasticsearch.common.Nullable;
2726
import org.elasticsearch.common.Strings;
2827
import org.elasticsearch.common.settings.ClusterSettings;
@@ -36,7 +35,6 @@
3635
import java.util.Arrays;
3736
import java.util.Collections;
3837
import java.util.HashSet;
39-
import java.util.List;
4038
import java.util.Map;
4139
import java.util.Set;
4240
import java.util.stream.Collectors;
@@ -47,25 +45,17 @@ public class OperationRouting {
4745
Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", true,
4846
Setting.Property.Dynamic, Setting.Property.NodeScope);
4947

50-
private List<String> awarenessAttributes;
5148
private boolean useAdaptiveReplicaSelection;
5249

5350
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
54-
this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
5551
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
56-
clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
57-
this::setAwarenessAttributes);
5852
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
5953
}
6054

6155
void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
6256
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
6357
}
6458

65-
private void setAwarenessAttributes(List<String> awarenessAttributes) {
66-
this.awarenessAttributes = awarenessAttributes;
67-
}
68-
6959
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
7060
return shards(clusterState, index, id, routing).shardsIt();
7161
}
@@ -194,23 +184,15 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
194184
}
195185
// if not, then use it as the index
196186
int routingHash = 31 * Murmur3HashFunction.hash(preference) + indexShard.shardId.hashCode();
197-
if (awarenessAttributes.isEmpty()) {
198-
return indexShard.activeInitializingShardsIt(routingHash);
199-
} else {
200-
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
201-
}
187+
return indexShard.activeInitializingShardsIt(routingHash);
202188
}
203189

204190
private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes,
205191
@Nullable ResponseCollectorService collectorService, @Nullable Map<String, Long> nodeCounts) {
206-
if (awarenessAttributes.isEmpty()) {
207-
if (useAdaptiveReplicaSelection) {
208-
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
209-
} else {
210-
return indexShard.activeInitializingShardsRandomIt();
211-
}
192+
if (useAdaptiveReplicaSelection) {
193+
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
212194
} else {
213-
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
195+
return indexShard.activeInitializingShardsRandomIt();
214196
}
215197
}
216198

server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java

-55
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,8 @@
4141
import org.elasticsearch.common.settings.Settings;
4242
import org.elasticsearch.index.shard.ShardId;
4343

44-
import java.util.Arrays;
4544
import java.util.Collections;
4645
import java.util.Iterator;
47-
import java.util.Map;
4846

4947
import static java.util.Collections.singletonMap;
5048
import static org.hamcrest.Matchers.anyOf;
@@ -224,59 +222,6 @@ public void testRandomRouting() {
224222
assertThat(shardRouting1, sameInstance(shardRouting3));
225223
}
226224

227-
public void testAttributePreferenceRouting() {
228-
Settings.Builder settings = Settings.builder()
229-
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
230-
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always");
231-
if (randomBoolean()) {
232-
settings.put("cluster.routing.allocation.awareness.attributes", " rack_id, zone ");
233-
} else {
234-
settings.putList("cluster.routing.allocation.awareness.attributes", "rack_id", "zone");
235-
}
236-
237-
AllocationService strategy = createAllocationService(settings.build());
238-
239-
MetaData metaData = MetaData.builder()
240-
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
241-
.build();
242-
243-
RoutingTable routingTable = RoutingTable.builder()
244-
.addAsNew(metaData.index("test"))
245-
.build();
246-
247-
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING
248-
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
249-
250-
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
251-
.add(newNode("node1", Map.of("rack_id", "rack_1", "zone", "zone1")))
252-
.add(newNode("node2", Map.of("rack_id", "rack_2", "zone", "zone2")))
253-
.localNodeId("node1")
254-
).build();
255-
clusterState = strategy.reroute(clusterState, "reroute");
256-
257-
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
258-
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
259-
260-
// after all are started, check routing iteration
261-
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0)
262-
.preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
263-
ShardRouting shardRouting = shardIterator.nextOrNull();
264-
assertThat(shardRouting, notNullValue());
265-
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
266-
shardRouting = shardIterator.nextOrNull();
267-
assertThat(shardRouting, notNullValue());
268-
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
269-
270-
shardIterator = clusterState.routingTable().index("test").shard(0)
271-
.preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
272-
shardRouting = shardIterator.nextOrNull();
273-
assertThat(shardRouting, notNullValue());
274-
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
275-
shardRouting = shardIterator.nextOrNull();
276-
assertThat(shardRouting, notNullValue());
277-
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
278-
}
279-
280225
public void testNodeSelectorRouting(){
281226
AllocationService strategy = createAllocationService(Settings.builder()
282227
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)

0 commit comments

Comments
 (0)