Skip to content

Commit f04468e

Browse files
Search during index create
When creating an index, the primary will be either unassigned or initializing for a short while. This causes issues when concurrent processes does searches hitting those indices, either explicitly, through patterns, aliases or data streams. This commit changes the behavior to disregard shards where the primary is inactive due to having just been created. Closes elastic#65846
1 parent 22bc833 commit f04468e

File tree

3 files changed

+208
-5
lines changed

3 files changed

+208
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.basic;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.action.ActionFuture;
24+
import org.elasticsearch.action.NoShardAvailableActionException;
25+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
26+
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
27+
import org.elasticsearch.action.get.GetRequest;
28+
import org.elasticsearch.action.search.SearchResponse;
29+
import org.elasticsearch.cluster.metadata.IndexMetadata;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.unit.TimeValue;
32+
import org.elasticsearch.index.IndexNotFoundException;
33+
import org.elasticsearch.plugins.Plugin;
34+
import org.elasticsearch.test.ESIntegTestCase;
35+
import org.elasticsearch.test.disruption.NetworkDisruption;
36+
import org.elasticsearch.test.transport.MockTransportService;
37+
38+
import java.util.Collection;
39+
import java.util.Collections;
40+
import java.util.Set;
41+
42+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
43+
import static org.hamcrest.Matchers.anyOf;
44+
import static org.hamcrest.Matchers.equalTo;
45+
import static org.hamcrest.Matchers.instanceOf;
46+
47+
48+
/**
49+
* This integration test contains two tests to provoke a search failure while initializing a new empty index:
50+
* <ul>
51+
* <li>testSearchDuringCreate: just tries to create an index and search, has low likelihood for provoking issue (but it does happen)
52+
* </li>
53+
* <li>testDelayIsolatedPrimary: delays network messages from all nodes except search coordinator to primary, ensuring that every test
54+
* run hits the case where a primary is initializing a newly created shard</i>
55+
* </ul>
56+
*/
57+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
58+
public class SearchWhileInitializingEmptyIT extends ESIntegTestCase {
59+
60+
@Override
61+
protected Collection<Class<? extends Plugin>> nodePlugins() {
62+
return Collections.singletonList(MockTransportService.TestPlugin.class);
63+
}
64+
65+
public void testSearchDuringCreate() {
66+
ActionFuture<CreateIndexResponse> createFuture = prepareCreate("test").execute();
67+
68+
for (int i = 0; i < 100; ++i) {
69+
SearchResponse searchResponse = client().prepareSearch("test*").setAllowPartialSearchResults(randomBoolean()).get();
70+
assertThat(searchResponse.getFailedShards(), equalTo(0));
71+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
72+
}
73+
74+
logger.info("done searching");
75+
assertAcked(createFuture.actionGet());
76+
}
77+
78+
public void testDelayIsolatedPrimary() throws Exception {
79+
String[] originalNodes = internalCluster().getNodeNames();
80+
String dataNode = internalCluster().startDataOnlyNode();
81+
String coordinatorNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
82+
83+
NetworkDisruption.Bridge bridge = new NetworkDisruption.Bridge(coordinatorNode, Set.of(dataNode), Set.of(originalNodes));
84+
NetworkDisruption scheme = new NetworkDisruption(bridge, new NetworkDisruption.NetworkDelay(NetworkDisruption.NetworkDelay.DEFAULT_DELAY_MIN));
85+
setDisruptionScheme(scheme);
86+
scheme.startDisrupting();
87+
88+
ActionFuture<CreateIndexResponse> createFuture;
89+
try {
90+
Settings.Builder builder = Settings.builder().put(indexSettings())
91+
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode);
92+
93+
createFuture =
94+
internalCluster().masterClient().admin().indices().create(new CreateIndexRequest("test", builder.build()).timeout(TimeValue.ZERO));
95+
96+
// wait for available on coordinator
97+
assertBusy(() -> {
98+
try {
99+
client(coordinatorNode).get(new GetRequest("test", "0")).actionGet();
100+
throw new IllegalStateException("non-assertion exception to escape assertBusy, get request must fail");
101+
} catch (IndexNotFoundException e) {
102+
throw new AssertionError(e);
103+
} catch (NoShardAvailableActionException e) {
104+
// now coordinator knows about the index.
105+
}
106+
});
107+
108+
// not available on data node.
109+
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () ->
110+
client(dataNode).get(new GetRequest("test", "0")).actionGet());
111+
assertThat(exception, anyOf(instanceOf(NoShardAvailableActionException.class), instanceOf(IndexNotFoundException.class)));
112+
113+
for (String indices : new String[] {"test*", "tes*", "test"}){
114+
logger.info("Searching for [{}]", indices);
115+
SearchResponse searchResponse =
116+
client(coordinatorNode).prepareSearch(indices).setAllowPartialSearchResults(randomBoolean()).get();
117+
assertThat(searchResponse.getFailedShards(), equalTo(0));
118+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
119+
}
120+
} finally {
121+
internalCluster().clearDisruptionScheme(true);
122+
}
123+
createFuture.actionGet();
124+
}
125+
}

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public static ShardIterator getShards(ClusterState clusterState, ShardId shardId
105105

106106
private static final Map<String, Set<String>> EMPTY_ROUTING = Collections.emptyMap();
107107

108-
private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
108+
static Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
109109
@Nullable Map<String, Set<String>> routing) {
110110
routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map
111111
final Set<IndexShardRoutingTable> set = new HashSet<>();
@@ -118,12 +118,19 @@ private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterSt
118118
for (String r : effectiveRouting) {
119119
final int routingPartitionSize = indexMetadata.getRoutingPartitionSize();
120120
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
121-
set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset)));
121+
IndexShardRoutingTable indexShard = RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset));
122+
if (indexShard.primary.active()
123+
|| indexShard.primary.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
124+
set.add(indexShard);
125+
}
122126
}
123127
}
124128
} else {
125129
for (IndexShardRoutingTable indexShard : indexRouting) {
126-
set.add(indexShard);
130+
if (indexShard.primary.active()
131+
|| indexShard.primary.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
132+
set.add(indexShard);
133+
}
127134
}
128135
}
129136
}
@@ -201,15 +208,15 @@ private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, Discovery
201208
}
202209
}
203210

204-
protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) {
211+
protected static IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) {
205212
IndexRoutingTable indexRouting = clusterState.routingTable().index(index);
206213
if (indexRouting == null) {
207214
throw new IndexNotFoundException(index);
208215
}
209216
return indexRouting;
210217
}
211218

212-
protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {
219+
protected static IndexMetadata indexMetadata(ClusterState clusterState, String index) {
213220
IndexMetadata indexMetadata = clusterState.metadata().index(index);
214221
if (indexMetadata == null) {
215222
throw new IndexNotFoundException(index);

server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java

+71
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020

2121
import org.elasticsearch.Version;
2222
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
23+
import org.elasticsearch.cluster.ClusterName;
2324
import org.elasticsearch.cluster.ClusterState;
2425
import org.elasticsearch.cluster.metadata.IndexMetadata;
26+
import org.elasticsearch.cluster.metadata.Metadata;
27+
import org.elasticsearch.cluster.node.DiscoveryNode;
28+
import org.elasticsearch.cluster.node.DiscoveryNodes;
2529
import org.elasticsearch.cluster.service.ClusterService;
2630
import org.elasticsearch.common.settings.ClusterSettings;
2731
import org.elasticsearch.common.settings.Settings;
@@ -39,15 +43,22 @@
3943
import java.util.Arrays;
4044
import java.util.HashMap;
4145
import java.util.HashSet;
46+
import java.util.Iterator;
4247
import java.util.List;
4348
import java.util.Map;
4449
import java.util.Set;
4550
import java.util.TreeMap;
51+
import java.util.stream.Collectors;
52+
import java.util.stream.IntStream;
4653

4754
import static org.hamcrest.CoreMatchers.containsString;
55+
import static org.hamcrest.Matchers.allOf;
4856
import static org.hamcrest.Matchers.containsInAnyOrder;
57+
import static org.hamcrest.Matchers.empty;
4958
import static org.hamcrest.Matchers.equalTo;
5059
import static org.hamcrest.Matchers.greaterThan;
60+
import static org.hamcrest.Matchers.hasSize;
61+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5162
import static org.hamcrest.object.HasToString.hasToString;
5263

5364
public class OperationRoutingTests extends ESTestCase{
@@ -606,4 +617,64 @@ public void testAdaptiveReplicaSelection() throws Exception {
606617
terminate(threadPool);
607618
}
608619

620+
public void testComputeTargetShards() {
621+
DiscoveryNode targetNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
622+
int shards = between(1, 5);
623+
int replicas = between(0, 5);
624+
IndexMetadata indexMetadata = IndexMetadata.builder("test")
625+
.settings(settings(Version.CURRENT))
626+
.numberOfShards(shards).numberOfReplicas(replicas)
627+
.build();
628+
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
629+
routingTableBuilder.addAsNew(indexMetadata);
630+
631+
final ClusterState stateWithUnassigned = ClusterState.builder(ClusterName.DEFAULT)
632+
.nodes(DiscoveryNodes.builder().add(targetNode).build())
633+
.metadata(Metadata.builder().put(indexMetadata, false))
634+
.routingTable(routingTableBuilder.build()).build();
635+
Set<String> routingValues = IntStream.range(0, 5).mapToObj(i -> randomAlphaOfLength(5)).collect(Collectors.toSet());;
636+
assertThat(OperationRouting.computeTargetedShards(stateWithUnassigned, new String[]{"test"}, null), empty());
637+
assertThat(OperationRouting.computeTargetedShards(stateWithUnassigned, new String[]{"test"}, Map.of("test", routingValues)), empty());
638+
assertThat(OperationRouting.computeTargetedShards(stateWithUnassigned, new String[]{"test"}, Map.of("not_test", routingValues)),
639+
empty());
640+
641+
RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.AbstractRoutingChangesObserver();
642+
RoutingNodes routingNodes = new RoutingNodes(stateWithUnassigned, false);
643+
int initializeCount = between(1, shards);
644+
Set<ShardRouting> initialized = new HashSet<>();
645+
for (RoutingNodes.UnassignedShards.UnassignedIterator iterator = routingNodes.unassigned().iterator(); iterator.hasNext(); ) {
646+
ShardRouting next = iterator.next();
647+
if (next.primary()) {
648+
initialized.add(iterator.initialize(targetNode.getId(), null, 0, routingChangesObserver));
649+
}
650+
if (initialized.size() >= initializeCount) {
651+
break;
652+
}
653+
}
654+
655+
final ClusterState stateWithInitialized = ClusterState.builder(stateWithUnassigned)
656+
.routingTable(new RoutingTable.Builder().updateNodes(stateWithUnassigned.routingTable().version(), routingNodes).build())
657+
.build();
658+
assertThat(OperationRouting.computeTargetedShards(stateWithInitialized, new String[]{"test"}, null), empty());
659+
assertThat(OperationRouting.computeTargetedShards(stateWithInitialized, new String[]{"test"}, Map.of("test", routingValues)),
660+
empty());
661+
assertThat(OperationRouting.computeTargetedShards(stateWithInitialized, new String[]{"test"}, Map.of("not_test", routingValues)),
662+
empty());
663+
664+
RoutingNodes routingNodesWithStarted = new RoutingNodes(stateWithInitialized, false);
665+
Set<ShardRouting> startedShards = new HashSet<>();
666+
for (ShardRouting shardRouting : initialized) {
667+
startedShards.add(routingNodesWithStarted.startShard(logger, shardRouting, routingChangesObserver));
668+
}
669+
670+
final ClusterState stateWithStarted = ClusterState.builder(stateWithInitialized)
671+
.routingTable(new RoutingTable.Builder().updateNodes(stateWithInitialized.routingTable().version(), routingNodesWithStarted)
672+
.build())
673+
.build();
674+
assertThat(OperationRouting.computeTargetedShards(stateWithStarted, new String[]{"test"}, null), hasSize(startedShards.size()));
675+
assertThat(OperationRouting.computeTargetedShards(stateWithStarted, new String[]{"test"}, Map.of("test", routingValues)).size(),
676+
allOf(lessThanOrEqualTo(startedShards.size()), greaterThan(0)));
677+
assertThat(OperationRouting.computeTargetedShards(stateWithStarted, new String[]{"test"}, Map.of("not_test", routingValues)),
678+
hasSize(startedShards.size()));
679+
}
609680
}

0 commit comments

Comments
 (0)