Skip to content

Commit 1b82914

Browse files
committed
Async fetch of shard started and store during allocation
Today, when a primary shard is not allocated we go to all the nodes to find where it is allocated (listing its started state). When we allocate a replica shard, we head to all the nodes and list its store to allocate the replica on a node that holds the closest matching index files to the primary. Those two operations today execute synchronously within the GatewayAllocator, which means they execute in a sync manner within the cluster update thread. For large clusters, or environments with very slow disk, those operations will stall the cluster update thread, making it seem like its stuck. Worse, if the FS is really slow, we timeout after 30s the operation (to not stall the cluster update thread completely). This means that we will have another run for the primary shard if we didn't find one, or we won't find the best node to place a shard since it might have timed out (listing stores need to list all files and read the checksum at the end of each file). On top of that, this sync operation happen one shard at a time, so its effectively compounding the problem in a serial manner the more shards we have and the slower FS is... This change moves to perform both listing the shard started states and the shard stores to an async manner. During the allocation by the GatewayAllocator, if data needs to be fetched from a node, it is done in an async fashion, with the response triggering a reroute to make sure the results will be taken into account. Also, if there are on going operations happening, the relevant shard data will not be taken into account until all the ongoing listing operations are done executing. The execution of listing shard states and stores has been moved to their own respective thread pools (scaling, so will go down to 0 when not needed anymore, unbounded queue, since we don't want to timeout, just let it execute based on how fast the local FS is). This is needed sine we are going to blast nodes with a lot of requests and we need to make sure there is no thread explosion. This change also improves the handling of shard failures coming from a specific node. Today, those nodes were ignored from allocation only for the single reroute round. Now, since fetching is async, we need to keep those failures around at least until a single successful fetch without the node is done, to make sure not to repeat allocating to the failed node all the time. Note, if before the indication of slow allocation was high pending tasks since the allocator was waiting for responses, not the pending tasks will be much smaller. In order to still indicate that the cluster is in the middle of fetching shard data, 2 attributes were added to the cluster health API, indicating the number of ongoing fetches of both started shards and shard store. closes elastic#9502 closes elastic#11101
1 parent 4a8f9ed commit 1b82914

16 files changed

+998
-327
lines changed

src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
5858
int initializingShards = 0;
5959
int unassignedShards = 0;
6060
int numberOfPendingTasks = 0;
61+
int numberOfInFlightFetch = 0;
6162
boolean timedOut = false;
6263
ClusterHealthStatus status = ClusterHealthStatus.RED;
6364
private List<String> validationFailures;
@@ -68,12 +69,14 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
6869

6970
/** needed for plugins BWC */
7071
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
71-
this(clusterName, concreteIndices, clusterState, -1);
72+
this(clusterName, concreteIndices, clusterState, -1, -1);
7273
}
7374

74-
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks) {
75+
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
76+
int numberOfInFlightFetch) {
7577
this.clusterName = clusterName;
7678
this.numberOfPendingTasks = numberOfPendingTasks;
79+
this.numberOfInFlightFetch = numberOfInFlightFetch;
7780
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
7881
validationFailures = validation.failures();
7982
numberOfNodes = clusterState.nodes().size();
@@ -167,6 +170,10 @@ public int getNumberOfPendingTasks() {
167170
return this.numberOfPendingTasks;
168171
}
169172

173+
public int getNumberOfInFlightFetch() {
174+
return this.numberOfInFlightFetch;
175+
}
176+
170177
/**
171178
* <tt>true</tt> if the waitForXXX has timeout out and did not match.
172179
*/
@@ -225,6 +232,10 @@ public void readFrom(StreamInput in) throws IOException {
225232
validationFailures.add(in.readString());
226233
}
227234
}
235+
236+
if (in.getVersion().onOrAfter(Version.V_1_6_0)) {
237+
numberOfInFlightFetch = in.readInt();
238+
}
228239
}
229240

230241
@Override
@@ -252,6 +263,10 @@ public void writeTo(StreamOutput out) throws IOException {
252263
for (String failure : validationFailures) {
253264
out.writeString(failure);
254265
}
266+
267+
if (out.getVersion().onOrAfter(Version.V_1_6_0)) {
268+
out.writeInt(numberOfInFlightFetch);
269+
}
255270
}
256271

257272

@@ -275,6 +290,7 @@ static final class Fields {
275290
static final XContentBuilderString NUMBER_OF_NODES = new XContentBuilderString("number_of_nodes");
276291
static final XContentBuilderString NUMBER_OF_DATA_NODES = new XContentBuilderString("number_of_data_nodes");
277292
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
293+
static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch");
278294
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
279295
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
280296
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
@@ -297,6 +313,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
297313
builder.field(Fields.INITIALIZING_SHARDS, getInitializingShards());
298314
builder.field(Fields.UNASSIGNED_SHARDS, getUnassignedShards());
299315
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
316+
builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch());
300317

301318
String level = params.param("level", "cluster");
302319
boolean outputIndices = "indices".equals(level) || "shards".equals(level);

src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.inject.Inject;
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.gateway.local.LocalGatewayAllocator;
3435
import org.elasticsearch.indices.IndexMissingException;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637
import org.elasticsearch.transport.TransportService;
@@ -41,12 +42,14 @@
4142
public class TransportClusterHealthAction extends TransportMasterNodeReadOperationAction<ClusterHealthRequest, ClusterHealthResponse> {
4243

4344
private final ClusterName clusterName;
45+
private final LocalGatewayAllocator gatewayAllocator;
4446

4547
@Inject
4648
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
47-
ClusterName clusterName, ActionFilters actionFilters) {
49+
ClusterName clusterName, ActionFilters actionFilters, LocalGatewayAllocator gatewayAllocator) {
4850
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters);
4951
this.clusterName = clusterName;
52+
this.gatewayAllocator = gatewayAllocator;
5053
}
5154

5255
@Override
@@ -164,12 +167,12 @@ public void onTimeout(TimeValue timeout) {
164167
}
165168

166169
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
167-
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks());
170+
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
168171
return prepareResponse(request, response, clusterState, waitFor);
169172
}
170173

171174
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
172-
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks());
175+
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
173176
boolean valid = prepareResponse(request, response, clusterState, waitFor);
174177
assert valid || timedOut;
175178
// we check for a timeout here since this method might be called from the wait_for_events
@@ -253,7 +256,7 @@ private boolean prepareResponse(final ClusterHealthRequest request, final Cluste
253256
}
254257

255258

256-
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks) {
259+
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks, int numberOfInFlightFetch) {
257260
if (logger.isTraceEnabled()) {
258261
logger.trace("Calculating health based on state version [{}]", clusterState.version());
259262
}
@@ -262,11 +265,11 @@ private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, Cluste
262265
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
263266
} catch (IndexMissingException e) {
264267
// one of the specified indices is not there - treat it as RED.
265-
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks);
268+
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch);
266269
response.status = ClusterHealthStatus.RED;
267270
return response;
268271
}
269272

270-
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks);
273+
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch);
271274
}
272275
}

src/main/java/org/elasticsearch/action/support/nodes/NodesOperationResponse.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222
import com.google.common.collect.Maps;
2323
import org.elasticsearch.action.ActionResponse;
24+
import org.elasticsearch.action.FailedNodeException;
2425
import org.elasticsearch.cluster.ClusterName;
26+
import org.elasticsearch.common.Nullable;
2527
import org.elasticsearch.common.io.stream.StreamInput;
2628
import org.elasticsearch.common.io.stream.StreamOutput;
2729

@@ -46,6 +48,14 @@ protected NodesOperationResponse(ClusterName clusterName, NodeResponse[] nodes)
4648
this.nodes = nodes;
4749
}
4850

51+
/**
52+
* The failed nodes, if set to be captured.
53+
*/
54+
@Nullable
55+
public FailedNodeException[] failures() {
56+
return null;
57+
}
58+
4959
public ClusterName getClusterName() {
5060
return this.clusterName;
5161
}

src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation;
2121

22+
import com.google.common.collect.ImmutableSet;
2223
import org.elasticsearch.cluster.ClusterInfo;
2324
import org.elasticsearch.cluster.metadata.MetaData;
2425
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -217,6 +218,17 @@ public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
217218
return nodes != null && nodes.contains(nodeId);
218219
}
219220

221+
public Set<String> getIgnoreNodes(ShardId shardId) {
222+
if (ignoredShardToNodes == null) {
223+
return ImmutableSet.of();
224+
}
225+
Set<String> ignore = ignoredShardToNodes.get(shardId);
226+
if (ignore == null) {
227+
return ImmutableSet.of();
228+
}
229+
return ImmutableSet.copyOf(ignore);
230+
}
231+
220232
/**
221233
* Create a routing decision, including the reason if the debug flag is
222234
* turned on

src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.inject.AbstractModule;
2323
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.gateway.local.LocalGatewayAllocator;
2425
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
2526

2627
/**
@@ -57,6 +58,7 @@ protected void configure() {
5758
if (shardsAllocator == null) {
5859
shardsAllocator = loadShardsAllocator(settings);
5960
}
61+
bind(LocalGatewayAllocator.class).asEagerSingleton();
6062
bind(GatewayAllocator.class).to(gatewayAllocator).asEagerSingleton();
6163
bind(ShardsAllocator.class).to(shardsAllocator).asEagerSingleton();
6264
}

0 commit comments

Comments
 (0)